Skip to content

Commit

Permalink
[RSDK-8885] Allow DiscoverComponents API to play well with modules (v…
Browse files Browse the repository at this point in the history
…iamrobotics#4410)

Co-authored-by: Maxim Pertsov <[email protected]>
  • Loading branch information
hexbabe and maximpertsov authored Oct 4, 2024
1 parent e1875d6 commit c12e8c0
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 3 deletions.
28 changes: 26 additions & 2 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap/zapcore"
pb "go.viam.com/api/module/v1"
robotpb "go.viam.com/api/robot/v1"
"go.viam.com/utils"
"go.viam.com/utils/pexec"
"go.viam.com/utils/rpc"
Expand Down Expand Up @@ -75,8 +76,11 @@ type module struct {
handles modlib.HandlerMap
sharedConn rdkgrpc.SharedConn
client pb.ModuleServiceClient
addr string
resources map[resource.Name]*addedResource
// robotClient supplements the ModuleServiceClient client to serve select robot level methods from the module server
// such as the DiscoverComponents API
robotClient robotpb.RobotServiceClient
addr string
resources map[resource.Name]*addedResource
// resourcesMu must be held if the `resources` field is accessed without
// write-locking the module manager.
resourcesMu sync.Mutex
Expand Down Expand Up @@ -991,6 +995,7 @@ func (m *module) dial() error {
// out.
m.sharedConn.ResetConn(rpc.GrpcOverHTTPClientConn{ClientConn: conn}, m.logger)
m.client = pb.NewModuleServiceClient(m.sharedConn.GrpcConn())
m.robotClient = robotpb.NewRobotServiceClient(m.sharedConn.GrpcConn())
return nil
}

Expand Down Expand Up @@ -1163,6 +1168,10 @@ func (m *module) registerResources(mgr modmaninterface.ModuleManager, logger log
case api.API.IsComponent():
for _, model := range models {
logger.Infow("Registering component API and model from module", "module", m.cfg.Name, "API", api.API, "model", model)
// We must copy because the Discover closure func relies on api and model, but they are iterators and mutate.
// Copying prevents mutation.
modelCopy := model
apiCopy := api
resource.RegisterComponent(api.API, model, resource.Registration[resource.Resource, resource.NoNativeConfig]{
Constructor: func(
ctx context.Context,
Expand All @@ -1172,6 +1181,21 @@ func (m *module) registerResources(mgr modmaninterface.ModuleManager, logger log
) (resource.Resource, error) {
return mgr.AddResource(ctx, conf, DepsToNames(deps))
},
Discover: func(ctx context.Context, logger logging.Logger) (interface{}, error) {
req := &robotpb.DiscoverComponentsRequest{
Queries: []*robotpb.DiscoveryQuery{
{Subtype: apiCopy.API.String(), Model: modelCopy.String()},
},
}

res, err := m.robotClient.DiscoverComponents(ctx, req)
if err != nil {
m.logger.Errorf("error in modular DiscoverComponents: %s", err)
return nil, err
}

return res, nil
},
})
}
case api.API.IsService():
Expand Down
64 changes: 64 additions & 0 deletions module/modmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package modmanager

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -32,6 +33,20 @@ import (
rutils "go.viam.com/rdk/utils"
)

type testDiscoveryResult struct {
Discovery []testDiscoveryItem `json:"discovery"`
}

type testDiscoveryItem struct {
Query testDiscoveryQuery `json:"query"`
Results map[string]string `json:"results"`
}

type testDiscoveryQuery struct {
Subtype string `json:"subtype"`
Model string `json:"model"`
}

func setupSocketWithRobot(t *testing.T) string {
t.Helper()

Expand Down Expand Up @@ -1322,3 +1337,52 @@ func TestBadModuleFailsFast(t *testing.T) {

test.That(t, err.Error(), test.ShouldContainSubstring, "module test-module exited too quickly after attempted startup")
}

func TestModularDiscovery(t *testing.T) {
ctx := context.Background()
logger := logging.NewTestLogger(t)

modPath := rtestutils.BuildTempModule(t, "module/testmodule")

modCfg := config.Module{
Name: "test-module",
ExePath: modPath,
}

parentAddr := setupSocketWithRobot(t)

mgr := setupModManager(t, ctx, parentAddr, logger, modmanageroptions.Options{UntrustedEnv: false})

err := mgr.Add(ctx, modCfg)
test.That(t, err, test.ShouldBeNil)

// The helper model implements actual (foobar) discovery
reg, ok := resource.LookupRegistration(generic.API, resource.NewModel("rdk", "test", "helper"))
test.That(t, ok, test.ShouldBeTrue)
test.That(t, reg, test.ShouldNotBeNil)

// Check that the Discover function is registered and make call
test.That(t, reg.Discover, test.ShouldNotBeNil)
result, err := reg.Discover(ctx, logger)
test.That(t, err, test.ShouldBeNil)
t.Log("Discovery result: ", result)

// Format result
jsonData, err := json.Marshal(result)
test.That(t, err, test.ShouldBeNil)
// Debug: print the JSON data
t.Logf("Raw JSON: %s", string(jsonData))

var discoveryResult testDiscoveryResult
err = json.Unmarshal(jsonData, &discoveryResult)
test.That(t, err, test.ShouldBeNil)
// Debug: print the casted struct
t.Logf("Casted struct: %+v", discoveryResult)

// Test fields
test.That(t, len(discoveryResult.Discovery), test.ShouldEqual, 1)
discovery := discoveryResult.Discovery[0]
test.That(t, discovery.Query.Subtype, test.ShouldEqual, "rdk:component:generic")
test.That(t, discovery.Query.Model, test.ShouldEqual, "rdk:test:helper")
test.That(t, discovery.Results["foo"], test.ShouldEqual, "bar")
}
65 changes: 65 additions & 0 deletions module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"

Expand All @@ -24,6 +25,7 @@ import (
robotpb "go.viam.com/api/robot/v1"
streampb "go.viam.com/api/stream/v1"
"go.viam.com/utils"
vprotoutils "go.viam.com/utils/protoutils"
"go.viam.com/utils/rpc"
"golang.org/x/exp/maps"
"google.golang.org/grpc"
Expand Down Expand Up @@ -191,6 +193,7 @@ type Module struct {
pcFailed <-chan struct{}
pb.UnimplementedModuleServiceServer
streampb.UnimplementedStreamServiceServer
robotpb.UnimplementedRobotServiceServer
}

// NewModule returns the basic module framework/structure.
Expand Down Expand Up @@ -230,6 +233,11 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod
if err := m.server.RegisterServiceServer(ctx, &streampb.StreamService_ServiceDesc, m); err != nil {
return nil, err
}
// We register the RobotService API to supplement the ModuleService in order to serve select robot level methods from the module server
// such as the DiscoverComponents API
if err := m.server.RegisterServiceServer(ctx, &robotpb.RobotService_ServiceDesc, m); err != nil {
return nil, err
}

// attempt to construct a PeerConnection
pc, err := rgrpc.NewLocalPeerConnection(logger)
Expand Down Expand Up @@ -506,6 +514,63 @@ func (m *Module) AddResource(ctx context.Context, req *pb.AddResourceRequest) (*
return &pb.AddResourceResponse{}, nil
}

// DiscoverComponents takes a list of discovery queries and returns corresponding
// component configurations.
func (m *Module) DiscoverComponents(
ctx context.Context,
req *robotpb.DiscoverComponentsRequest,
) (*robotpb.DiscoverComponentsResponse, error) {
var discoveries []*robotpb.Discovery

for _, q := range req.Queries {
// Handle triplet edge case i.e. if the subtype doesn't contain ':', add the "rdk:component:" prefix
if !strings.ContainsRune(q.Subtype, ':') {
q.Subtype = "rdk:component:" + q.Subtype
}

api, err := resource.NewAPIFromString(q.Subtype)
if err != nil {
return nil, fmt.Errorf("invalid subtype: %s: %w", q.Subtype, err)
}
model, err := resource.NewModelFromString(q.Model)
if err != nil {
return nil, fmt.Errorf("invalid model: %s: %w", q.Model, err)
}

resInfo, ok := resource.LookupRegistration(api, model)
if !ok {
return nil, fmt.Errorf("no registration found for API %s and model %s", api, model)
}

if resInfo.Discover == nil {
return nil, fmt.Errorf("discovery not supported for API %s and model %s", api, model)
}

results, err := resInfo.Discover(ctx, m.logger)
if err != nil {
return nil, fmt.Errorf("error discovering components for API %s and model %s: %w", api, model, err)
}
if results == nil {
return nil, fmt.Errorf("error discovering components for API %s and model %s: results was nil", api, model)
}

pbResults, err := vprotoutils.StructToStructPb(results)
if err != nil {
return nil, fmt.Errorf("unable to convert discovery results to pb struct for query %v: %w", q, err)
}

pbDiscovery := &robotpb.Discovery{
Query: q,
Results: pbResults,
}
discoveries = append(discoveries, pbDiscovery)
}

return &robotpb.DiscoverComponentsResponse{
Discovery: discoveries,
}, nil
}

// ReconfigureResource receives the component/service configuration from the parent.
func (m *Module) ReconfigureResource(ctx context.Context, req *pb.ReconfigureResourceRequest) (*pb.ReconfigureResourceResponse, error) {
var res resource.Resource
Expand Down
9 changes: 8 additions & 1 deletion module/testmodule/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ func mainWithArgs(ctx context.Context, args []string, logger logging.Logger) err
resource.RegisterComponent(
generic.API,
helperModel,
resource.Registration[resource.Resource, resource.NoNativeConfig]{Constructor: newHelper})
resource.Registration[resource.Resource, resource.NoNativeConfig]{
Constructor: newHelper,
Discover: func(ctx context.Context, logger logging.Logger) (interface{}, error) {
return map[string]string{
"foo": "bar",
}, nil
},
})
err = myMod.AddModelFromRegistry(ctx, generic.API, helperModel)
if err != nil {
return err
Expand Down

0 comments on commit c12e8c0

Please sign in to comment.