Skip to content

Commit

Permalink
[DATA-3338] fix capture all from camera stability
Browse files Browse the repository at this point in the history
  • Loading branch information
nicksanford committed Nov 21, 2024
1 parent baf6c50 commit 583a8e6
Show file tree
Hide file tree
Showing 41 changed files with 2,591 additions and 834 deletions.
25 changes: 15 additions & 10 deletions components/arm/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package arm
import (
"context"
"errors"
"time"

v1 "go.viam.com/api/common/v1"
pb "go.viam.com/api/component/arm/v1"
Expand Down Expand Up @@ -39,18 +40,20 @@ func newEndPositionCollector(resource interface{}, params data.CollectorParams)
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
v, err := arm.EndPosition(ctx, data.FromDMExtraMap)
if err != nil {
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, endPosition.String(), err)
return res, data.FailedToReadErr(params.ComponentName, endPosition.String(), err)
}
o := v.Orientation().OrientationVectorDegrees()
return pb.GetEndPositionResponse{
return data.NewTabularCaptureResult(timeRequested, pb.GetEndPositionResponse{
Pose: &v1.Pose{
X: v.Point().X,
Y: v.Point().Y,
Expand All @@ -60,7 +63,7 @@ func newEndPositionCollector(resource interface{}, params data.CollectorParams)
OZ: o.OZ,
Theta: o.Theta,
},
}, nil
})
})
return data.NewCollector(cFunc, params)
}
Expand All @@ -73,21 +76,23 @@ func newJointPositionsCollector(resource interface{}, params data.CollectorParam
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
v, err := arm.JointPositions(ctx, data.FromDMExtraMap)
if err != nil {
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
return res, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
}
jp, err := referenceframe.JointPositionsFromInputs(arm.ModelFrame(), v)
if err != nil {
return nil, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
return res, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
}
return pb.GetJointPositionsResponse{Positions: jp}, nil
return data.NewTabularCaptureResult(timeRequested, pb.GetJointPositionsResponse{Positions: jp})
})
return data.NewCollector(cFunc, params)
}
Expand Down
13 changes: 7 additions & 6 deletions components/arm/collectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func TestCollectors(t *testing.T) {
tests := []struct {
name string
collector data.CollectorConstructor
expected *datasyncpb.SensorData
expected []*datasyncpb.SensorData
}{
{
name: "End position collector should write a pose",
collector: arm.NewEndPositionCollector,
expected: &datasyncpb.SensorData{
expected: []*datasyncpb.SensorData{{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: tu.ToStructPBStruct(t, map[string]any{
"pose": map[string]any{
Expand All @@ -49,27 +49,28 @@ func TestCollectors(t *testing.T) {
"z": 3,
},
})},
},
}},
},
{
name: "Joint positions collector should write a list of positions",
collector: arm.NewJointPositionsCollector,
expected: &datasyncpb.SensorData{
expected: []*datasyncpb.SensorData{{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: tu.ToStructPBStruct(t, map[string]any{
"positions": map[string]any{
"values": []any{1.0, 2.0, 3.0},
},
})},
},
}},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
start := time.Now()
buf := tu.NewMockBuffer()
buf := tu.NewMockBuffer(t)
params := data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: componentName,
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Expand Down
30 changes: 18 additions & 12 deletions components/board/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package board

import (
"context"
"time"

"github.com/pkg/errors"
pb "go.viam.com/api/component/board/v1"
Expand Down Expand Up @@ -39,10 +40,12 @@ func newAnalogCollector(resource interface{}, params data.CollectorParams) (data
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
var analogValue AnalogValue
if _, ok := arg[analogReaderNameKey]; !ok {
return nil, data.FailedToReadErr(params.ComponentName, analogs.String(),
return res, data.FailedToReadErr(params.ComponentName, analogs.String(),
errors.New("Must supply reader_name in additional_params for analog collector"))
}
if reader, err := board.AnalogByName(arg[analogReaderNameKey].String()); err == nil {
Expand All @@ -51,17 +54,18 @@ func newAnalogCollector(resource interface{}, params data.CollectorParams) (data
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, analogs.String(), err)
return res, data.FailedToReadErr(params.ComponentName, analogs.String(), err)
}
}
return pb.ReadAnalogReaderResponse{

return data.NewTabularCaptureResult(timeRequested, pb.ReadAnalogReaderResponse{
Value: int32(analogValue.Value),
MinRange: analogValue.Min,
MaxRange: analogValue.Max,
StepSize: analogValue.StepSize,
}, nil
})
})
return data.NewCollector(cFunc, params)
}
Expand All @@ -74,10 +78,12 @@ func newGPIOCollector(resource interface{}, params data.CollectorParams) (data.C
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
var value bool
if _, ok := arg[gpioPinNameKey]; !ok {
return nil, data.FailedToReadErr(params.ComponentName, gpios.String(),
return res, data.FailedToReadErr(params.ComponentName, gpios.String(),
errors.New("Must supply pin_name in additional params for gpio collector"))
}
if gpio, err := board.GPIOPinByName(arg[gpioPinNameKey].String()); err == nil {
Expand All @@ -86,14 +92,14 @@ func newGPIOCollector(resource interface{}, params data.CollectorParams) (data.C
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, gpios.String(), err)
return res, data.FailedToReadErr(params.ComponentName, gpios.String(), err)
}
}
return pb.GetGPIOResponse{
return data.NewTabularCaptureResult(timeRequested, pb.GetGPIOResponse{
High: value,
}, nil
})
})
return data.NewCollector(cFunc, params)
}
Expand Down
14 changes: 8 additions & 6 deletions components/board/collectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ func TestCollectors(t *testing.T) {
name string
params data.CollectorParams
collector data.CollectorConstructor
expected *datasyncpb.SensorData
expected []*datasyncpb.SensorData
}{
{
name: "Board analog collector should write an analog response",
params: data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: componentName,
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Expand All @@ -43,19 +44,20 @@ func TestCollectors(t *testing.T) {
},
},
collector: board.NewAnalogCollector,
expected: &datasyncpb.SensorData{
expected: []*datasyncpb.SensorData{{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: tu.ToStructPBStruct(t, map[string]any{
"value": 1,
"min_range": 0,
"max_range": 10,
"step_size": float64(float32(0.1)),
})},
},
}},
},
{
name: "Board gpio collector should write a gpio response",
params: data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: componentName,
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Expand All @@ -64,19 +66,19 @@ func TestCollectors(t *testing.T) {
},
},
collector: board.NewGPIOCollector,
expected: &datasyncpb.SensorData{
expected: []*datasyncpb.SensorData{{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: tu.ToStructPBStruct(t, map[string]any{
"high": true,
})},
},
}},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
start := time.Now()
buf := tu.NewMockBuffer()
buf := tu.NewMockBuffer(t)
tc.params.Clock = clock.New()
tc.params.Target = buf

Expand Down
Loading

0 comments on commit 583a8e6

Please sign in to comment.