diff --git a/CHANGELOG.md b/CHANGELOG.md index 68e19944..91db2f90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## v3.0.0 (unreleased) +- Implement MultiUpsert method in client and expose it via the Client interface (#304) ## v2.6.0 (2018-04-16) - Fix bug in invalidating fallback cache on upsert (#292) diff --git a/client.go b/client.go index 2965cc5a..b87cdd9b 100644 --- a/client.go +++ b/client.go @@ -138,10 +138,12 @@ type Client interface { // to update in fieldsToUpdate (or all the fields if you use dosa.All()) Upsert(ctx context.Context, fieldsToUpdate []string, objectToUpdate DomainObject) error - // TODO: Coming in v2.1 // MultiUpsert creates or updates multiple rows. A list of fields to - // update can be specified. Use All() or nil for all fields. - // MultiUpsert(context.Context, []string, ...DomainObject) (MultiResult, error) + // update can be specified. Use All() or nil for all fields. Partial + // successes are possible, so it is critical to inspect the MultiResult response + // to check for failures. + // NOTE: This API only upserts objects of same entity type from same scope. + MultiUpsert(context.Context, []string, ...DomainObject) (MultiResult, error) // Remove removes a row by primary key. The passed-in entity should contain // the primary key field values, all other fields are ignored. @@ -151,7 +153,7 @@ type Client interface { // given RemoveRangeOp. RemoveRange(ctx context.Context, removeRangeOp *RemoveRangeOp) error - // TODO: Coming in v2.1 + // TODO: Coming in future versions // MultiRemove removes multiple rows by primary key. The passed-in entity should // contain the primary key field values. // MultiRemove(context.Context, ...DomainObject) (MultiResult, error) @@ -410,12 +412,70 @@ func (c *client) createOrUpsert(ctx context.Context, fieldsToUpdate []string, en return fn(ctx, re.EntityInfo(), fieldValues) } -// MultiUpsert updates several entities by primary key, The entities provided -// must contain values for all components of its primary key for the operation -// to succeed. If `fieldsToUpdate` is provided, only a subset of fields will be -// updated. -func (c *client) MultiUpsert(context.Context, []string, ...DomainObject) (MultiResult, error) { - panic("not implemented") +// MultiUpsert updates several entities of the same type by primary key, The +// entities provided must contain values for all components of its primary key +// for the operation to succeed. If `fieldsToUpdate` is provided, only a subset +// of fields will be updated. Moreover, all entities being upserted must be part +// of the same partition otherwise the request will be rejected. This is enforced +// server side. +// NOTE: This endpoint is not officially released. No guarantees about correctness +// or performance of this API will be guaranteed until v3.0.0 is released. +func (c *client) MultiUpsert(ctx context.Context, fieldsToUpdate []string, entities ...DomainObject) (MultiResult, error) { + if !c.initialized { + return nil, &ErrNotInitialized{} + } + + if len(entities) == 0 { + return nil, fmt.Errorf("the number of entities to upsert is zero") + } + + // lookup registered entity, registry will return error if registration + // is not found + var re *RegisteredEntity + var listMultiValues []map[string]FieldValue + for _, entity := range entities { + ere, err := c.registrar.Find(entity) + if err != nil { + return nil, err + } + + if re == nil { + re = ere + } else if re != ere { + return nil, fmt.Errorf("inconsistent entity type for multi upsert: %v vs %v", re, ere) + } + + // translate entity field values to a map of primary key name/values pairs + keyFieldValues := re.KeyFieldValues(entity) + + // translate remaining entity fields values to map of column name/value pairs + fieldValues, err := re.OnlyFieldValues(entity, fieldsToUpdate) + if err != nil { + return nil, err + } + + // merge key and remaining values + for k, v := range keyFieldValues { + fieldValues[k] = v + } + + listMultiValues = append(listMultiValues, fieldValues) + } + + results, err := c.connector.MultiUpsert(ctx, re.EntityInfo(), listMultiValues) + if err != nil { + return nil, errors.Wrap(err, "MultiUpsert") + } + + multiResult := MultiResult{} + // map results to entity fields + for i, entity := range entities { + if results[i] != nil { + multiResult[entity] = results[i] + } + } + + return multiResult, nil } // Remove deletes an entity by primary key, The entity provided must contain diff --git a/client_test.go b/client_test.go index cf1e12e8..ca40dbb2 100644 --- a/client_test.go +++ b/client_test.go @@ -400,6 +400,70 @@ func TestClient_Upsert(t *testing.T) { assert.NoError(t, c3.Upsert(ctx, fieldsToUpdate, cte1)) assert.Equal(t, cte1.Email, updatedEmail) } + +func TestClient_MultiUpsert(t *testing.T) { + reg1, _ := dosaRenamed.NewRegistrar(scope, namePrefix, cte1) + reg2, _ := dosaRenamed.NewRegistrar(scope, namePrefix, cte1, cte2) + fieldsToUpsert := []string{"Email"} + + e1 := &ClientTestEntity1{ID: int64(1), Email: "bar@email.com"} + e2 := &ClientTestEntity1{ID: int64(2), Email: "foo@email.com"} + + // uninitialized + c1 := dosaRenamed.NewClient(reg1, nullConnector) + _, err := c1.MultiUpsert(ctx, dosaRenamed.All(), cte1) + assert.Error(t, err) + + // empty upsert + c1.Initialize(ctx) + _, err = c1.MultiUpsert(ctx, dosaRenamed.All()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "zero") + + // unregistered object + c1.Initialize(ctx) + _, err = c1.MultiUpsert(ctx, dosaRenamed.All(), cte2) + assert.Error(t, err) + assert.Contains(t, err.Error(), "ClientTestEntity2") + + // multi read different types of object + c1.Initialize(ctx) + _, err = c1.MultiUpsert(ctx, dosaRenamed.All(), cte2, cte1) + assert.Error(t, err) + assert.Contains(t, err.Error(), "ClientTestEntity2") + + // happy path, mock connector + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockConn := mocks.NewMockConnector(ctrl) + mockConn.EXPECT().CheckSchema(ctx, gomock.Any(), gomock.Any(), gomock.Any()).Return(int32(1), nil).AnyTimes() + mockConn.EXPECT().MultiUpsert(ctx, gomock.Any(), gomock.Any()). + Do(func(_ context.Context, _ *dosaRenamed.EntityInfo, allValues []map[string]dosaRenamed.FieldValue) { + assert.Equal(t, allValues[0]["id"], e1.ID) + assert.Equal(t, allValues[0]["email"], e1.Email) + assert.Equal(t, allValues[1]["id"], e2.ID) + assert.Equal(t, allValues[1]["email"], e2.Email) + + }).Return([]error{nil, nil}, nil).Times(1) + c2 := dosaRenamed.NewClient(reg2, mockConn) + assert.NoError(t, c2.Initialize(ctx)) + rs, err := c2.MultiUpsert(ctx, fieldsToUpsert, e1, e2) + assert.NoError(t, err) + assert.Empty(t, rs) + + // system error, mock connector + mockConn.EXPECT().MultiUpsert(ctx, gomock.Any(), gomock.Any()).Return([]error{nil, nil}, errors.New("connector error")) + rs, err = c2.MultiUpsert(ctx, fieldsToUpsert, e1, e2) + assert.Error(t, err) + assert.Empty(t, rs) + + // single entity error, mock connector + mockConn.EXPECT().MultiUpsert(ctx, gomock.Any(), gomock.Any()).Return([]error{errors.New("single error"), nil}, nil) + rs, err = c2.MultiUpsert(ctx, fieldsToUpsert, e1, e2) + assert.NoError(t, err) + assert.NotNil(t, rs[e1]) +} + func TestClient_CreateIfNotExists(t *testing.T) { reg1, _ := dosaRenamed.NewRegistrar("test", "team.service", cte1) reg2, _ := dosaRenamed.NewRegistrar("test", "team.service", cte1, cte2) @@ -726,11 +790,12 @@ func TestClient_MultiRead(t *testing.T) { // uninitialized c1 := dosaRenamed.NewClient(reg1, nullConnector) - assert.Error(t, c1.Read(ctx, fieldsToRead, cte1)) + _, err := c1.MultiRead(ctx, fieldsToRead, cte1) + assert.Error(t, err) // unregistered object c1.Initialize(ctx) - _, err := c1.MultiRead(ctx, dosaRenamed.All(), cte2) + _, err = c1.MultiRead(ctx, dosaRenamed.All(), cte2) assert.Error(t, err) assert.Contains(t, err.Error(), "ClientTestEntity2") @@ -762,20 +827,6 @@ func TestClient_MultiRead(t *testing.T) { assert.Equal(t, rs[e2].Error(), "not fonud") } -/* TODO: Coming in v2.1 -func TestClient_Unimplemented(t *testing.T) { - reg1, _ := dosaRenamed.NewRegistrar(scope, namePrefix, cte1) - - c := dosaRenamed.NewClient(reg1, nullConnector) - assert.Panics(t, func() { - c.MultiUpsert(ctx, dosaRenamed.All(), &ClientTestEntity1{}) - }) - assert.Panics(t, func() { - c.MultiRemove(ctx, &ClientTestEntity1{}) - }) -} -*/ - func TestAdminClient_CreateScope(t *testing.T) { c := dosaRenamed.NewAdminClient(nullConnector) assert.NotNil(t, c) diff --git a/mocks/client.go b/mocks/client.go index d4e1f5da..350cc03c 100644 --- a/mocks/client.go +++ b/mocks/client.go @@ -97,6 +97,22 @@ func (_mr *_MockClientRecorder) MultiRead(arg0, arg1 interface{}, arg2 ...interf return _mr.mock.ctrl.RecordCall(_mr.mock, "MultiRead", _s...) } +func (_m *MockClient) MultiUpsert(_param0 context.Context, _param1 []string, _param2 ...dosa.DomainObject) (dosa.MultiResult, error) { + _s := []interface{}{_param0, _param1} + for _, _x := range _param2 { + _s = append(_s, _x) + } + ret := _m.ctrl.Call(_m, "MultiUpsert", _s...) + ret0, _ := ret[0].(dosa.MultiResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockClientRecorder) MultiUpsert(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + _s := append([]interface{}{arg0, arg1}, arg2...) + return _mr.mock.ctrl.RecordCall(_mr.mock, "MultiUpsert", _s...) +} + func (_m *MockClient) Range(_param0 context.Context, _param1 *dosa.RangeOp) ([]dosa.DomainObject, string, error) { ret := _m.ctrl.Call(_m, "Range", _param0, _param1) ret0, _ := ret[0].([]dosa.DomainObject)