Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement MultiUpsert method in client and expose it via the Client interface #304

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## v3.0.0 (unreleased)
- Implement MultiUpsert method in client and expose it via the Client interface (#304)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any breaking changes here, what's the reason for wanting to call this 3.0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess no reason. should i add a 2.7.0?


## v2.6.0 (2018-04-16)
- Fix bug in invalidating fallback cache on upsert (#292)
Expand Down
79 changes: 69 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,12 @@ type Client interface {
// to update in fieldsToUpdate (or all the fields if you use dosa.All())
Upsert(ctx context.Context, fieldsToUpdate []string, objectToUpdate DomainObject) error

// TODO: Coming in v2.1
// MultiUpsert creates or updates multiple rows. A list of fields to
// update can be specified. Use All() or nil for all fields.
// MultiUpsert(context.Context, []string, ...DomainObject) (MultiResult, error)
// update can be specified. Use All() or nil for all fields. Partial
// successes are possible, so it is critical to inspect the MultiResult response
// to check for failures.
// NOTE: This API only upserts objects of same entity type from same scope.
MultiUpsert(context.Context, []string, ...DomainObject) (MultiResult, error)

// Remove removes a row by primary key. The passed-in entity should contain
// the primary key field values, all other fields are ignored.
Expand All @@ -151,7 +153,7 @@ type Client interface {
// given RemoveRangeOp.
RemoveRange(ctx context.Context, removeRangeOp *RemoveRangeOp) error

// TODO: Coming in v2.1
// TODO: Coming in v2.7
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.0

// MultiRemove removes multiple rows by primary key. The passed-in entity should
// contain the primary key field values.
// MultiRemove(context.Context, ...DomainObject) (MultiResult, error)
Expand Down Expand Up @@ -410,12 +412,69 @@ func (c *client) createOrUpsert(ctx context.Context, fieldsToUpdate []string, en
return fn(ctx, re.EntityInfo(), fieldValues)
}

// MultiUpsert updates several entities by primary key, The entities provided
// must contain values for all components of its primary key for the operation
// to succeed. If `fieldsToUpdate` is provided, only a subset of fields will be
// updated.
func (c *client) MultiUpsert(context.Context, []string, ...DomainObject) (MultiResult, error) {
panic("not implemented")
// MultiUpsert updates several entities of the same type by primary key, The
// entities provided must contain values for all components of its primary key
// for the operation to succeed. If `fieldsToUpdate` is provided, only a subset
// of fields will be updated. Moreover, all entities being upserted must be part
// of the same partition otherwise the request will be rejected.
// NOTE: This endpoint is not officially released. No guarantees about correctness
// or performance of this API will be guaranteed until v2.6 is released.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember to update the comment when release

func (c *client) MultiUpsert(ctx context.Context, fieldsToUpdate []string, entities ...DomainObject) (MultiResult, error) {
if !c.initialized {
return nil, &ErrNotInitialized{}
}

if len(entities) == 0 {
return nil, fmt.Errorf("the number of entities to upsert is zero")
}

// lookup registered entity, registry will return error if registration
// is not found
var re *RegisteredEntity
var listMultiValues []map[string]FieldValue
for _, entity := range entities {
ere, err := c.registrar.Find(entity)
if err != nil {
return nil, err
}

if re == nil {
re = ere
} else if re != ere {
return nil, fmt.Errorf("inconsistent entity type for multi upsert: %v vs %v", re, ere)
}

// translate entity field values to a map of primary key name/values pairs
keyFieldValues := re.KeyFieldValues(entity)

// translate remaining entity fields values to map of column name/value pairs
fieldValues, err := re.OnlyFieldValues(entity, fieldsToUpdate)
if err != nil {
return nil, err
}

// merge key and remaining values
for k, v := range keyFieldValues {
fieldValues[k] = v
}

listMultiValues = append(listMultiValues, fieldValues)
}

results, err := c.connector.MultiUpsert(ctx, re.EntityInfo(), listMultiValues)
if err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add errors.wrap

}

multiResult := MultiResult{}
// map results to entity fields
for i, entity := range entities {
if results[i] != nil {
multiResult[entity] = results[i]
}
}

return multiResult, nil
}

// Remove deletes an entity by primary key, The entity provided must contain
Expand Down
83 changes: 67 additions & 16 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,70 @@ func TestClient_Upsert(t *testing.T) {
assert.NoError(t, c3.Upsert(ctx, fieldsToUpdate, cte1))
assert.Equal(t, cte1.Email, updatedEmail)
}

func TestClient_MultiUpsert(t *testing.T) {
reg1, _ := dosaRenamed.NewRegistrar(scope, namePrefix, cte1)
reg2, _ := dosaRenamed.NewRegistrar(scope, namePrefix, cte1, cte2)
fieldsToUpsert := []string{"Email"}

e1 := &ClientTestEntity1{ID: int64(1), Email: "[email protected]"}
e2 := &ClientTestEntity1{ID: int64(2), Email: "[email protected]"}

// uninitialized
c1 := dosaRenamed.NewClient(reg1, nullConnector)
_, err := c1.MultiUpsert(ctx, dosaRenamed.All(), cte1)
assert.Error(t, err)

// empty upsert
c1.Initialize(ctx)
_, err = c1.MultiUpsert(ctx, dosaRenamed.All())
assert.Error(t, err)
assert.Contains(t, err.Error(), "zero")

// unregistered object
c1.Initialize(ctx)
_, err = c1.MultiUpsert(ctx, dosaRenamed.All(), cte2)
assert.Error(t, err)
assert.Contains(t, err.Error(), "ClientTestEntity2")

// multi read different types of object
c1.Initialize(ctx)
_, err = c1.MultiUpsert(ctx, dosaRenamed.All(), cte2, cte1)
assert.Error(t, err)
assert.Contains(t, err.Error(), "ClientTestEntity2")

// happy path, mock connector
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockConn := mocks.NewMockConnector(ctrl)
mockConn.EXPECT().CheckSchema(ctx, gomock.Any(), gomock.Any(), gomock.Any()).Return(int32(1), nil).AnyTimes()
mockConn.EXPECT().MultiUpsert(ctx, gomock.Any(), gomock.Any()).
Do(func(_ context.Context, _ *dosaRenamed.EntityInfo, allValues []map[string]dosaRenamed.FieldValue) {
assert.Equal(t, allValues[0]["id"], e1.ID)
assert.Equal(t, allValues[0]["email"], e1.Email)
assert.Equal(t, allValues[1]["id"], e2.ID)
assert.Equal(t, allValues[1]["email"], e2.Email)

}).Return([]error{nil, nil}, nil).Times(1)
c2 := dosaRenamed.NewClient(reg2, mockConn)
assert.NoError(t, c2.Initialize(ctx))
rs, err := c2.MultiUpsert(ctx, fieldsToUpsert, e1, e2)
assert.NoError(t, err)
assert.Empty(t, rs)

// system error, mock connector
mockConn.EXPECT().MultiUpsert(ctx, gomock.Any(), gomock.Any()).Return([]error{nil, nil}, errors.New("connector error"))
rs, err = c2.MultiUpsert(ctx, fieldsToUpsert, e1, e2)
assert.Error(t, err)
assert.Empty(t, rs)

// single entity error, mock connector
mockConn.EXPECT().MultiUpsert(ctx, gomock.Any(), gomock.Any()).Return([]error{errors.New("single error"), nil}, nil)
rs, err = c2.MultiUpsert(ctx, fieldsToUpsert, e1, e2)
assert.NoError(t, err)
assert.NotNil(t, rs[e1])
}

func TestClient_CreateIfNotExists(t *testing.T) {
reg1, _ := dosaRenamed.NewRegistrar("test", "team.service", cte1)
reg2, _ := dosaRenamed.NewRegistrar("test", "team.service", cte1, cte2)
Expand Down Expand Up @@ -726,11 +790,12 @@ func TestClient_MultiRead(t *testing.T) {

// uninitialized
c1 := dosaRenamed.NewClient(reg1, nullConnector)
assert.Error(t, c1.Read(ctx, fieldsToRead, cte1))
_, err := c1.MultiRead(ctx, fieldsToRead, cte1)
assert.Error(t, err)

// unregistered object
c1.Initialize(ctx)
_, err := c1.MultiRead(ctx, dosaRenamed.All(), cte2)
_, err = c1.MultiRead(ctx, dosaRenamed.All(), cte2)
assert.Error(t, err)
assert.Contains(t, err.Error(), "ClientTestEntity2")

Expand Down Expand Up @@ -762,20 +827,6 @@ func TestClient_MultiRead(t *testing.T) {
assert.Equal(t, rs[e2].Error(), "not fonud")
}

/* TODO: Coming in v2.1
func TestClient_Unimplemented(t *testing.T) {
reg1, _ := dosaRenamed.NewRegistrar(scope, namePrefix, cte1)

c := dosaRenamed.NewClient(reg1, nullConnector)
assert.Panics(t, func() {
c.MultiUpsert(ctx, dosaRenamed.All(), &ClientTestEntity1{})
})
assert.Panics(t, func() {
c.MultiRemove(ctx, &ClientTestEntity1{})
})
}
*/

func TestAdminClient_CreateScope(t *testing.T) {
c := dosaRenamed.NewAdminClient(nullConnector)
assert.NotNil(t, c)
Expand Down
16 changes: 16 additions & 0 deletions mocks/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ func (_mr *_MockClientRecorder) MultiRead(arg0, arg1 interface{}, arg2 ...interf
return _mr.mock.ctrl.RecordCall(_mr.mock, "MultiRead", _s...)
}

func (_m *MockClient) MultiUpsert(_param0 context.Context, _param1 []string, _param2 ...dosa.DomainObject) (dosa.MultiResult, error) {
_s := []interface{}{_param0, _param1}
for _, _x := range _param2 {
_s = append(_s, _x)
}
ret := _m.ctrl.Call(_m, "MultiUpsert", _s...)
ret0, _ := ret[0].(dosa.MultiResult)
ret1, _ := ret[1].(error)
return ret0, ret1
}

func (_mr *_MockClientRecorder) MultiUpsert(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
_s := append([]interface{}{arg0, arg1}, arg2...)
return _mr.mock.ctrl.RecordCall(_mr.mock, "MultiUpsert", _s...)
}

func (_m *MockClient) Range(_param0 context.Context, _param1 *dosa.RangeOp) ([]dosa.DomainObject, string, error) {
ret := _m.ctrl.Call(_m, "Range", _param0, _param1)
ret0, _ := ret[0].([]dosa.DomainObject)
Expand Down