Skip to content

Commit

Permalink
Merge pull request #394 from ben-mays/master
Browse files Browse the repository at this point in the history
Parameterizing yarpc transport
  • Loading branch information
ben-mays authored Jul 23, 2019
2 parents b297b71 + dca8e14 commit 3860a88
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 35 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Changelog

## v3.4.8 (unreleased)
- Nothing changed yet
- Add transport parameter to yarpc connector and cli

## v3.4.7 (2019-07-22)
- Add String() method to all components of the routing connector
Expand Down
3 changes: 2 additions & 1 deletion cmd/dosa/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func (b BuildInfo) Execute(args []string) error {
type GlobalOptions struct {
Host string `long:"host" default:"127.0.0.1" description:"The hostname or IP for the gateway."`
Port string `short:"p" long:"port" default:"21300" description:"The hostname or IP for the gateway."`
ServiceName string `short:"s" long:"service" default:"dosa-gateway" description:"The TChannel service name for the gateway."`
ServiceName string `short:"s" long:"service" default:"dosa-gateway" description:"The service name for the gateway."`
Transport string `long:"transport" default:"tchannel" description:"The transport to use when connecting to the gateway. Valid options: tchannel, http."`
CallerName callerFlag `long:"caller" default:"dosacli-$USER" description:"The RPC Caller name."`
Timeout timeFlag `long:"timeout" default:"60s" description:"The timeout for gateway requests. E.g., 100ms, 0.5s, 1s. If no unit is specified, milliseconds are assumed."`
Version bool `long:"version" description:"Display version info"`
Expand Down
1 change: 1 addition & 0 deletions cmd/dosa/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func provideShellQueryClient(opts GlobalOptions, scope, prefix, path, structName
Port: opts.Port,
CallerName: opts.CallerName.String(),
ServiceName: opts.ServiceName,
Transport: opts.Transport,
ExtraHeaders: getAuthHeaders(),
}

Expand Down
83 changes: 71 additions & 12 deletions connectors/yarpc/yarpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,32 @@ import (
"context"
"encoding/json"
"fmt"
"net/url"
"strings"

"crypto/tls"

"github.com/pkg/errors"
"github.com/uber-go/dosa"
dosarpc "github.com/uber/dosa-idl/.gen/dosa"
"github.com/uber/dosa-idl/.gen/dosa/dosaclient"
rpc "go.uber.org/yarpc"
"go.uber.org/yarpc/peer"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/http"
"go.uber.org/yarpc/transport/tchannel"
"google.golang.org/grpc/credentials"
)

const (
_version = "version"
errCodeNotFound int32 = 404
errCodeAlreadyExists int32 = 409
errConnectionRefused string = "getsockopt: connection refused"
httpTransport string = "http"
tchannelTransport string = "tchannel"
grpcTransport string = "grpc"
)

// ErrConnectionRefused is used to help deliver a better error message when
Expand All @@ -63,6 +74,7 @@ type Config struct {
Port string `yaml:"port"`
CallerName string `yaml:"callerName"`
ServiceName string `yaml:"serviceName"`
Transport string `yaml:"transport"`
ExtraHeaders map[string]string
}

Expand All @@ -73,14 +85,66 @@ type Connector struct {
headers map[string]string
}

func buildYARPCConfig(config Config) (rpc.Config, error) {
ycfg := rpc.Config{Name: config.CallerName}
hostPort := fmt.Sprintf("%s:%s", config.Host, config.Port)
switch strings.ToLower(config.Transport) {
case tchannelTransport:
// this looks wrong, BUT since it's a uni-directional tchannel
// connection, we have to pass CallerName as the tchannel "ServiceName"
// for source/destination to be reported correctly by RPC layer.
ts, err := tchannel.NewChannelTransport(tchannel.ServiceName(config.CallerName))
if err != nil {
return ycfg, err
}
ycfg.Outbounds = rpc.Outbounds{
config.ServiceName: {
Unary: ts.NewSingleOutbound(hostPort),
},
}
case httpTransport:
// a http host can contain a path segement after the port, breaking the simplified host+port parsing.
uri, err := url.Parse(config.Host)
if err != nil {
return ycfg, errors.New("invalid host given to yarpc connector")
}
// set the port if not specified in the host, if not set, will default to either :80 or :443 based on scheme.
if uri.Port() == "" && config.Port != "" {
uri.Host = fmt.Sprintf("%s:%s", uri.Host, config.Port)
}
ts := http.NewTransport()
ycfg.Outbounds = rpc.Outbounds{
config.ServiceName: {
Unary: ts.NewSingleOutbound(uri.String()),
},
}
case grpcTransport:
tc := credentials.NewTLS(&tls.Config{})
ts := grpc.NewTransport()
chooser := peer.NewSingle(
hostport.Identify(hostPort),
ts.NewDialer(grpc.DialerCredentials(tc)),
)
ycfg.Outbounds = rpc.Outbounds{
config.ServiceName: {
Unary: ts.NewOutbound(chooser),
},
}
default:
return ycfg, errors.New("invalid transport given to yarpc connector")
}
return ycfg, nil
}

// NewConnector creates a new instance with user provided transport
func NewConnector(config Config) (*Connector, error) {
// Ensure host, port, serviceName, and callerName are all specified.
if config.Host == "" {
return nil, errors.New("no host specified")
}

if config.Port == "" {
// port is optional for http transports
if config.Port == "" && config.Transport != httpTransport {
return nil, errors.New("no port specified")
}

Expand All @@ -92,20 +156,15 @@ func NewConnector(config Config) (*Connector, error) {
return nil, errors.New("no callerName specified")
}

ycfg := rpc.Config{Name: config.CallerName}
hostPort := fmt.Sprintf("%s:%s", config.Host, config.Port)
// this looks wrong, BUT since it's a uni-directional tchannel
// connection, we have to pass CallerName as the tchannel "ServiceName"
// for source/destination to be reported correctly by RPC layer.
ts, err := tchannel.NewChannelTransport(tchannel.ServiceName(config.CallerName))
// preserve legacy behavior by defaulting to tchannel
if config.Transport == "" {
config.Transport = tchannelTransport
}

ycfg, err := buildYARPCConfig(config)
if err != nil {
return nil, err
}
ycfg.Outbounds = rpc.Outbounds{
config.ServiceName: {
Unary: ts.NewSingleOutbound(hostPort),
},
}

// important to note that this will panic if config contains invalid
// values such as service name containing invalid characters
Expand Down
38 changes: 38 additions & 0 deletions connectors/yarpc/yarpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,3 +1119,41 @@ func TestPanic(t *testing.T) {
sut.ScopeExists(ctx, "")
})
}

func TestYarpcBuilder(t *testing.T) {
tests := map[string]struct {
cfg Config
err error
}{
"valid http": {
cfg: Config{ServiceName: "dosa", Host: "http://dosa.uberinternal.com:9090", Transport: httpTransport},
},
"valid http with port": {
cfg: Config{ServiceName: "dosa", Host: "http://dosa.uberinternal.com", Port: "9090", Transport: httpTransport},
},
"valid tchannel": {
cfg: Config{ServiceName: "dosa", Host: "http://dosa.uberinternal.com", Port: "12001", Transport: tchannelTransport, CallerName: "test"},
},
"valid grpc": {
cfg: Config{ServiceName: "dosa", Host: "http://dosa.uberinternal.com", Transport: grpcTransport},
},
"invalid transport": {
cfg: Config{ServiceName: "dosa", Host: "http://dosa.uberinternal.com", Transport: "fake"},
err: errors.New("invalid transport"),
},
}

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
res, err := buildYARPCConfig(tt.cfg)
if tt.err != nil {
assert.NotNil(t, err, "expected err but got nil")
assert.Error(t, tt.err, err.Error(), "expected err and received err not equal")
} else {
assert.NotNil(t, res)
assert.NotNil(t, res.Outbounds)
assert.NotNil(t, res.Outbounds["dosa"])
}
})
}
}
Loading

0 comments on commit 3860a88

Please sign in to comment.