Skip to content

Commit

Permalink
Add optional client side timestamps to allow strict ordering in load …
Browse files Browse the repository at this point in the history
…balanced environments
  • Loading branch information
jquirke committed Aug 6, 2018
1 parent d835d64 commit d6fd179
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 52 deletions.
7 changes: 4 additions & 3 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ type SchemaRef struct {
// EntityInfo is all the information about an entity, including the schema reference
// as well as the entity definition
type EntityInfo struct {
Ref *SchemaRef
Def *EntityDefinition
TTL *time.Duration
Ref *SchemaRef
Def *EntityDefinition
TTL *time.Duration
ClientTimestamp *time.Time
}

// StringSet is a set of strings.
Expand Down
47 changes: 39 additions & 8 deletions connectors/yarpc/yarpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"strings"

"github.com/jonboulle/clockwork"
"github.com/pkg/errors"
"github.com/uber-go/dosa"
dosarpc "github.com/uber/dosa-idl/.gen/dosa"
Expand Down Expand Up @@ -59,16 +60,19 @@ func ErrorIsConnectionRefused(err error) bool {

// Config contains the YARPC connector parameters.
type Config struct {
Host string `yaml:"host"`
Port string `yaml:"port"`
CallerName string `yaml:"callerName"`
ServiceName string `yaml:"serviceName"`
Host string `yaml:"host"`
Port string `yaml:"port"`
CallerName string `yaml:"callerName"`
ServiceName string `yaml:"serviceName"`
ClientTimestamp bool `yaml:"clientTimestamp"`
}

// Connector holds the client-side RPC interface and some schema information
type Connector struct {
client dosaclient.Interface
dispatcher *rpc.Dispatcher
client dosaclient.Interface
dispatcher *rpc.Dispatcher
useClientTimestamp bool
clock clockwork.Clock
}

// NewConnector creates a new instance with user provided transport
Expand Down Expand Up @@ -115,8 +119,10 @@ func NewConnector(config Config) (*Connector, error) {
client := dosaclient.New(dispatcher.ClientConfig(config.ServiceName))

return &Connector{
dispatcher: dispatcher,
client: client,
dispatcher: dispatcher,
client: client,
useClientTimestamp: config.ClientTimestamp,
clock: clockwork.NewRealClock(),
}, nil
}

Expand Down Expand Up @@ -171,6 +177,10 @@ func (c *Connector) Upsert(ctx context.Context, ei *dosa.EntityInfo, values map[
TTL: &ttl,
}

if c.useClientTimestamp {
upsertRequest.Timestamp = c.advanceTimestamp()
}

err = c.client.Upsert(ctx, &upsertRequest, VersionHeader())

if !dosarpc.Dosa_Upsert_Helper.IsException(err) {
Expand Down Expand Up @@ -199,6 +209,10 @@ func (c *Connector) MultiUpsert(ctx context.Context, ei *dosa.EntityInfo, multiV
// TTL: &ttl, mgode@ has not yet committed origin/ttl-for-multi-upsert
}

if c.useClientTimestamp {
request.Timestamp = c.advanceTimestamp()
}

response, err := c.client.MultiUpsert(ctx, request, VersionHeader())
if err != nil {
if !dosarpc.Dosa_MultiUpsert_Helper.IsException(err) {
Expand Down Expand Up @@ -343,6 +357,10 @@ func (c *Connector) Remove(ctx context.Context, ei *dosa.EntityInfo, keys map[st
KeyValues: rpcFields,
}

if c.useClientTimestamp {
removeRequest.Timestamp = c.advanceTimestamp()
}

err = c.client.Remove(ctx, removeRequest, VersionHeader())
if err != nil {
if !dosarpc.Dosa_Remove_Helper.IsException(err) {
Expand All @@ -367,6 +385,10 @@ func (c *Connector) MultiRemove(ctx context.Context, ei *dosa.EntityInfo, multiK
KeyValues: keyValues,
}

if c.useClientTimestamp {
request.Timestamp = c.advanceTimestamp()
}

response, err := c.client.MultiRemove(ctx, request, VersionHeader())
if err != nil {
if !dosarpc.Dosa_MultiRemove_Helper.IsException(err) {
Expand Down Expand Up @@ -399,6 +421,10 @@ func (c *Connector) RemoveRange(ctx context.Context, ei *dosa.EntityInfo, column
Conditions: rpcConditions,
}

if c.useClientTimestamp {
request.Timestamp = c.advanceTimestamp()
}

if err := c.client.RemoveRange(ctx, request, VersionHeader()); err != nil {
if !dosarpc.Dosa_RemoveRange_Helper.IsException(err) {
return errors.Wrap(err, "failed to RemoveRange due to network issue")
Expand Down Expand Up @@ -687,3 +713,8 @@ func wrapIDLError(err *dosarpc.Error) error {
// TODO check other fields in the thrift error object such as ShouldRetry
return errors.New(*err.Msg)
}

func (c *Connector) advanceTimestamp() *int64 {
timeStamp := c.clock.Now().UnixNano() / 1000
return &timeStamp
}
98 changes: 95 additions & 3 deletions connectors/yarpc/yarpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/jonboulle/clockwork"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/uber-go/dosa"
Expand Down Expand Up @@ -448,6 +449,22 @@ func TestYARPCClient_Upsert(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "\"c7\"") // must contain name of bad field
assert.Contains(t, err.Error(), "incorrect UUID length") // must mention that the uuid is too short

// cover the timestamp case
testInstant := time.Unix(0, 1999)
testTimestamp := testInstant.UnixNano() / 1000
mockedClient.EXPECT().Upsert(ctx, &drpc.UpsertRequest{
Ref: &testRPCSchemaRef,
EntityValues: outFields,
TTL: &nt,
Timestamp: &testTimestamp,
}, gomock.Any())

sut = Connector{client: mockedClient,
clock: clockwork.NewFakeClockAt(testInstant),
useClientTimestamp: true}
err = sut.Upsert(ctx, testEi, inFields)
assert.Nil(t, err)
}
}

Expand Down Expand Up @@ -512,6 +529,9 @@ func TestYARPCClient_MultiUpsert(t *testing.T) {
testErrMsg := "response error"

ttlVal := int64(1)
// for the timestamp cases
testInstant := time.Unix(0, 1999)
testTimestamp := testInstant.UnixNano() / 1000

testCases := []struct {
NetworkError error
Expand All @@ -520,25 +540,36 @@ func TestYARPCClient_MultiUpsert(t *testing.T) {
Name string
Value interface{}
}
TTL *int64
TTL *int64
timestamp *int64
}{
{
nil,
nil,
getStubbedUpsertRequests()[0],
&ttlVal,
nil,
},
{
nil,
nil,
getStubbedUpsertRequests()[1],
nil,
&testTimestamp,
},
{
nil,
nil,
getStubbedUpsertRequests()[1],
nil,
nil,
},
{
errors.New("an error"),
nil,
getStubbedUpsertRequests()[0],
nil,
nil,
},
{
nil,
Expand All @@ -547,6 +578,7 @@ func TestYARPCClient_MultiUpsert(t *testing.T) {
},
getStubbedUpsertRequests()[0],
nil,
nil,
},
}

Expand Down Expand Up @@ -576,11 +608,14 @@ func TestYARPCClient_MultiUpsert(t *testing.T) {
Ref: &testRPCSchemaRef,
Entities: []drpc.FieldValueMap{outFields},
// TTL: expectedTTL,
Timestamp: testCase.timestamp,
}, gomock.Any()).Return(&drpc.MultiUpsertResponse{Errors: []*drpc.Error{testCase.ResponseError}}, testCase.NetworkError).Times(1)

// create the YARPCClient and give it the mocked RPC interface
// see https://en.wiktionary.org/wiki/SUT for the reason this is called sut
sut := Connector{client: mockedClient}
sut := Connector{client: mockedClient,
clock: clockwork.NewFakeClockAt(testInstant),
useClientTimestamp: testCase.timestamp != nil}

retErrors, err := sut.MultiUpsert(ctx, ei, []map[string]dosa.FieldValue{inFields})
if testCase.NetworkError != nil {
Expand Down Expand Up @@ -611,39 +646,55 @@ func TestYARPCClient_MultiRemove(t *testing.T) {

testErrMsg := "response error"

testInstant := time.Unix(0, 1999)
testTimestamp := testInstant.UnixNano() / 1000

testCases := []struct {
NetworkError error
ResponseError *drpc.Error
RemoveRequest map[string]dosa.FieldValue
timestamp *int64
}{
{
nil,
nil,
getStubbedRemoveRequest(),
nil,
},
{
nil,
nil,
getStubbedRemoveRequest(),
&testTimestamp,
},
{
errors.New("an error"),
nil,
getStubbedRemoveRequest(),
nil,
},
{
nil,
&drpc.Error{
Msg: &testErrMsg,
},
getStubbedRemoveRequest(),
nil,
},
}

for _, testCase := range testCases {
mockedClient.EXPECT().MultiRemove(ctx, &drpc.MultiRemoveRequest{
Ref: &testRPCSchemaRef,
KeyValues: []drpc.FieldValueMap{getStubbedRemoveDOSARequest()},
Timestamp: testCase.timestamp,
}, gomock.Any()).Return(&drpc.MultiRemoveResponse{Errors: []*drpc.Error{testCase.ResponseError}}, testCase.NetworkError).Times(1)

// create the YARPCClient and give it the mocked RPC interface
// see https://en.wiktionary.org/wiki/SUT for the reason this is called sut
sut := Connector{client: mockedClient}
sut := Connector{client: mockedClient,
clock: clockwork.NewFakeClockAt(testInstant),
useClientTimestamp: testCase.timestamp != nil}

retErrors, err := sut.MultiRemove(ctx, testEi, []map[string]dosa.FieldValue{testCase.RemoveRequest})
if testCase.NetworkError != nil {
Expand Down Expand Up @@ -956,6 +1007,31 @@ func TestConnector_RemoveRange(t *testing.T) {
})
assert.Error(t, err)
assert.EqualError(t, errors.Cause(err), "uuid: incorrect UUID length: baduuid")

// cover the timestamp case
testInstant := time.Unix(0, 1999)
testTimestamp := testInstant.UnixNano() / 1000

mockedClient.EXPECT().RemoveRange(ctx, gomock.Any(), gomock.Any()).Do(func(_ context.Context, request *drpc.RemoveRangeRequest, option yarpc2.CallOption) {
assert.Equal(t, testRPCSchemaRef, *request.Ref)
assert.Equal(t, len(request.Conditions), 1)
assert.Equal(t, *request.Timestamp, testTimestamp)
condition := request.Conditions[0]
assert.Equal(t, fieldName, *condition.Field.Name)
assert.Equal(t, field.Value, condition.Field.Value)
assert.Equal(t, &op, condition.Op)
}).Return(nil)

sut = Connector{client: mockedClient,
clock: clockwork.NewFakeClockAt(testInstant),
useClientTimestamp: true}
err = sut.RemoveRange(ctx, testEi, map[string][]*dosa.Condition{
"c1": {&dosa.Condition{
Value: int64(10),
Op: dosa.Eq,
}},
})
assert.NoError(t, err)
}

func TestConnector_Scan(t *testing.T) {
Expand Down Expand Up @@ -1067,6 +1143,22 @@ func TestConnector_Remove(t *testing.T) {
assert.Contains(t, err.Error(), "\"c7\"") // must contain name of bad field
assert.Contains(t, err.Error(), ErrInCorrectUUIDLength) // must mention that the uuid is too short

// cover the timestamp case
testInstant := time.Unix(0, 1999)
testTimestamp := testInstant.UnixNano() / 1000
removeRequest = &drpc.RemoveRequest{
Ref: &testRPCSchemaRef,
KeyValues: getStubbedRemoveDOSARequest(),
Timestamp: &testTimestamp,
}
mockedClient.EXPECT().Remove(ctx, removeRequest, gomock.Any()).Return(nil)

sut = Connector{client: mockedClient,
clock: clockwork.NewFakeClockAt(testInstant),
useClientTimestamp: true}
err = sut.Remove(ctx, testEi, getStubbedRemoveRequest())
assert.Nil(t, err)

// make sure we actually called Read on the interface
ctrl.Finish()
}
Expand Down
Loading

0 comments on commit d6fd179

Please sign in to comment.