Skip to content

Commit

Permalink
Add Metrics to Dosa Library
Browse files Browse the repository at this point in the history
  • Loading branch information
awani-m committed Sep 14, 2021
1 parent 53d1ffd commit 6c5c2de
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 39 deletions.
22 changes: 20 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/uber-go/dosa/metrics"
"io"
"reflect"
"time"
Expand Down Expand Up @@ -271,14 +272,16 @@ type client struct {
initialized bool
registrar Registrar
connector Connector
stats metrics.Scope
}

// NewClient returns a new DOSA client for the registrar and connector provided.
// This is currently only a partial implementation to demonstrate basic CRUD functionality.
func NewClient(reg Registrar, conn Connector) Client {
func NewClient(reg Registrar, conn Connector, scope metrics.Scope) Client {
return &client{
registrar: reg,
connector: conn,
stats: metrics.CheckIfNilStats(scope),
}
}

Expand Down Expand Up @@ -318,6 +321,10 @@ func (c *client) Initialize(ctx context.Context) error {
return nil
}

func (c *client) incStat(action, method string) {
c.stats.SubScope("dosa").Tagged(map[string]string{"method": method}).Counter(action).Inc(1)
}

// CreateIfNotExists creates a row, but only if it does not exist. The entity
// provided must contain values for all components of its primary key for the
// operation to succeed.
Expand Down Expand Up @@ -352,6 +359,7 @@ func (c *client) Read(ctx context.Context, fieldsToRead []string, entity DomainO

results, err := c.connector.Read(ctx, re.EntityInfo(), fieldValues, columnsToRead)
if err != nil {
c.incStat("error", "Read")
return err
}

Expand Down Expand Up @@ -402,6 +410,7 @@ func (c *client) MultiRead(ctx context.Context, fieldsToRead []string, entities

results, err := c.connector.MultiRead(ctx, re.EntityInfo(), listFieldValues, columnsToRead)
if err != nil {
c.incStat("error", "MultiRead")
return nil, err
}

Expand Down Expand Up @@ -466,7 +475,11 @@ func (c *client) createOrUpsert(ctx context.Context, fieldsToUpdate []string, en
ei.TTL = dynTTL
}

return fn(ctx, ei, fieldValues)
err = fn(ctx, ei, fieldValues)
if err != nil {
c.incStat("error", "CreateOrUpsert")
}
return
}

// Remove deletes an entity by primary key, The entity provided must contain
Expand All @@ -486,6 +499,9 @@ func (c *client) Remove(ctx context.Context, entity DomainObject) error {
keyFieldValues := re.KeyFieldValues(entity)

err = c.connector.Remove(ctx, re.EntityInfo(), keyFieldValues)
if err != nil {
c.incStat("error", "Remove")
}
return err
}

Expand Down Expand Up @@ -537,6 +553,7 @@ func (c *client) Range(ctx context.Context, r *RangeOp) ([]DomainObject, string,
// call the server side method
values, token, err := c.connector.Range(ctx, re.EntityInfo(), columnConditions, fieldsToRead, r.token, r.limit)
if err != nil {
c.incStat("error", "Range")
return nil, "", errors.Wrap(err, "Range")
}

Expand Down Expand Up @@ -598,6 +615,7 @@ func (c *client) ScanEverything(ctx context.Context, sop *ScanOp) ([]DomainObjec
// call the server side method
values, token, err := c.connector.Scan(ctx, re.EntityInfo(), fieldsToRead, sop.token, sop.limit)
if err != nil {
c.incStat("error", "Scan")
return nil, "", err
}
objectArray := objectsFromValueArray(sop.object, values, re, nil)
Expand Down
70 changes: 35 additions & 35 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func ExampleNewClient() {
conn := devnull.NewConnector()

// create the client using the registrar and connector
client := dosaRenamed.NewClient(reg, conn)
client := dosaRenamed.NewClient(reg, conn, )

err = client.Initialize(context.Background())
if err != nil {
Expand All @@ -101,7 +101,7 @@ func TestNewClient(t *testing.T) {
assert.NotNil(t, reg)

// initialize a pseudo-connected client
client := dosaRenamed.NewClient(reg, nullConnector)
client := dosaRenamed.NewClient(reg, nullConnector, nil)
err = client.Initialize(ctx)
assert.NoError(t, err)
}
Expand All @@ -113,17 +113,17 @@ func TestClient_Initialize(t *testing.T) {
reg, _ := dosaRenamed.NewRegistrar("test", "team.service", cte1)

// find error
c1 := dosaRenamed.NewClient(emptyReg, nullConnector)
c1 := dosaRenamed.NewClient(emptyReg, nullConnector, nil)
assert.Error(t, c1.Initialize(ctx))

// CheckSchema error
errConn := mocks.NewMockConnector(ctrl)
errConn.EXPECT().CheckSchema(ctx, gomock.Any(), gomock.Any(), gomock.Any()).Return(int32(-1), errors.New("CheckSchema error")).AnyTimes()
c2 := dosaRenamed.NewClient(reg, errConn)
c2 := dosaRenamed.NewClient(reg, errConn, nil)
assert.Error(t, c2.Initialize(ctx))

// happy path
c3 := dosaRenamed.NewClient(reg, nullConnector)
c3 := dosaRenamed.NewClient(reg, nullConnector, nil)
assert.NoError(t, c3.Initialize(ctx))

// already initialized
Expand All @@ -141,7 +141,7 @@ func TestClient_Read(t *testing.T) {
}

// uninitialized
c1 := dosaRenamed.NewClient(reg1, nullConnector)
c1 := dosaRenamed.NewClient(reg1, nullConnector, nil)
assert.Error(t, c1.Read(ctx, fieldsToRead, cte1))

// unregistered object
Expand All @@ -161,7 +161,7 @@ func TestClient_Read(t *testing.T) {
assert.Equal(t, columnsToRead, []string{"id", "email"})

}).Return(results, nil).MinTimes(1)
c3 := dosaRenamed.NewClient(reg2, mockConn)
c3 := dosaRenamed.NewClient(reg2, mockConn, nil)
assert.NoError(t, c3.Initialize(ctx))
assert.NoError(t, c3.Read(ctx, fieldsToRead, cte1))
assert.Equal(t, cte1.ID, results["id"])
Expand All @@ -180,7 +180,7 @@ func TestClient_Read_pointer_result(t *testing.T) {
}

// uninitialized
c1 := dosaRenamed.NewClient(reg1, nullConnector)
c1 := dosaRenamed.NewClient(reg1, nullConnector, nil)
assert.Error(t, c1.Read(ctx, fieldsToRead, cte1))

// unregistered object
Expand All @@ -200,7 +200,7 @@ func TestClient_Read_pointer_result(t *testing.T) {
assert.Equal(t, columnsToRead, []string{"id", "email"})

}).Return(results, nil).MinTimes(1)
c3 := dosaRenamed.NewClient(reg2, mockConn)
c3 := dosaRenamed.NewClient(reg2, mockConn, nil)
assert.NoError(t, c3.Initialize(ctx))
assert.NoError(t, c3.Read(ctx, fieldsToRead, cte1))
testutil.AssertEqForPointer(testAssert(t), cte1.ID, results["id"])
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestClient_Read_pointer(t *testing.T) {
Do(func(_ context.Context, _ *dosaRenamed.EntityInfo, columnValues map[string]dosaRenamed.FieldValue, columnsToRead []string) {
assert.Equal(t, columnValues["id"], allTypes.ID)
}).Return(results, nil).MinTimes(1)
c3 := dosaRenamed.NewClient(reg1, mockConn)
c3 := dosaRenamed.NewClient(reg1, mockConn, nil)
assert.NoError(t, c3.Initialize(ctx))
assert.NoError(t, c3.Read(ctx, dosaRenamed.All(), allTypes))
assert.Equal(t, allTypes.ID, results["id"])
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestClient_Read_Errors(t *testing.T) {
assert.NotEmpty(t, columnsToRead)
}).Return(nil, readError)

c1 := dosaRenamed.NewClient(reg1, mockConn)
c1 := dosaRenamed.NewClient(reg1, mockConn, nil)
assert.NoError(t, c1.Initialize(ctx))
err := c1.Read(ctx, dosaRenamed.All(), cte1)
assert.Error(t, err)
Expand All @@ -315,11 +315,11 @@ func TestClient_Upsert(t *testing.T) {
updatedEmail := "[email protected]"

// uninitialized
c1 := dosaRenamed.NewClient(reg1, nullConnector)
c1 := dosaRenamed.NewClient(reg1, nullConnector, nil)
assert.Error(t, c1.Upsert(ctx, fieldsToUpdate, cte1))

// unregistered object error
c2 := dosaRenamed.NewClient(reg1, nullConnector)
c2 := dosaRenamed.NewClient(reg1, nullConnector, nil)
c2.Initialize(ctx)
assert.Error(t, c2.Upsert(ctx, fieldsToUpdate, cte2))

Expand All @@ -335,7 +335,7 @@ func TestClient_Upsert(t *testing.T) {
cte1.Email = updatedEmail
}).
Return(nil).MinTimes(1)
c3 := dosaRenamed.NewClient(reg2, mockConn)
c3 := dosaRenamed.NewClient(reg2, mockConn, nil)
assert.NoError(t, c3.Initialize(ctx))
assert.NoError(t, c3.Upsert(ctx, fieldsToUpdate, cte1))
assert.Equal(t, cte1.Email, updatedEmail)
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestClient_Upsert_DynTTL(t *testing.T) {
idx++
}).
Return(nil).MinTimes(1)
c1 := dosaRenamed.NewClient(reg1, mockConn)
c1 := dosaRenamed.NewClient(reg1, mockConn, nil)
assert.NoError(t, c1.Initialize(ctx))
for _, ttl := range ttls {
if ttl != dosaRenamed.NoTTL() {
Expand All @@ -373,7 +373,7 @@ func TestClient_Upsert_DynTTL(t *testing.T) {
}

// invalid case
c2 := dosaRenamed.NewClient(reg1, nullConnector)
c2 := dosaRenamed.NewClient(reg1, nullConnector, nil)
invalidTTL := 998 * time.Millisecond
cte3.TTL(&invalidTTL)
assert.Error(t, c2.Upsert(ctx, []string{}, cte3))
Expand All @@ -385,11 +385,11 @@ func TestClient_CreateIfNotExists(t *testing.T) {
updatedEmail := "[email protected]"

// uninitialized
c1 := dosaRenamed.NewClient(reg1, nullConnector)
c1 := dosaRenamed.NewClient(reg1, nullConnector, nil)
assert.Error(t, c1.CreateIfNotExists(ctx, cte1))

// unregistered object error
c2 := dosaRenamed.NewClient(reg1, nullConnector)
c2 := dosaRenamed.NewClient(reg1, nullConnector, nil)
c2.Initialize(ctx)
assert.Error(t, c2.CreateIfNotExists(ctx, cte2))

Expand All @@ -405,7 +405,7 @@ func TestClient_CreateIfNotExists(t *testing.T) {
cte1.Email = updatedEmail
}).
Return(nil).MinTimes(1)
c3 := dosaRenamed.NewClient(reg2, mockConn)
c3 := dosaRenamed.NewClient(reg2, mockConn, nil)
assert.NoError(t, c3.Initialize(ctx))
assert.NoError(t, c3.CreateIfNotExists(ctx, cte1))
assert.Equal(t, cte1.Email, updatedEmail)
Expand Down Expand Up @@ -433,7 +433,7 @@ func TestClient_CreateIfNotExists_DynTTL(t *testing.T) {
idx++
}).
Return(nil).MinTimes(1)
c1 := dosaRenamed.NewClient(reg1, mockConn)
c1 := dosaRenamed.NewClient(reg1, mockConn, nil)
assert.NoError(t, c1.Initialize(ctx))
for _, ttl := range ttls {
if ttl != dosaRenamed.NoTTL() {
Expand All @@ -443,7 +443,7 @@ func TestClient_CreateIfNotExists_DynTTL(t *testing.T) {
}

// invalid case
c2 := dosaRenamed.NewClient(reg1, nullConnector)
c2 := dosaRenamed.NewClient(reg1, nullConnector, nil)
invalidTTL := 998 * time.Millisecond
cte3.TTL(&invalidTTL)
assert.Error(t, c2.CreateIfNotExists(ctx, cte3))
Expand All @@ -457,7 +457,7 @@ func TestClient_Upsert_Errors(t *testing.T) {
mockConn := mocks.NewMockConnector(ctrl)
mockConn.EXPECT().CheckSchema(ctx, gomock.Any(), gomock.Any(), gomock.Any()).Return(int32(1), nil).AnyTimes()

c1 := dosaRenamed.NewClient(reg1, mockConn)
c1 := dosaRenamed.NewClient(reg1, mockConn, nil)
assert.NoError(t, c1.Initialize(ctx))
mockConn.EXPECT().Upsert(ctx, gomock.Any(), gomock.Not(dosaRenamed.All())).Return(nil)
err := c1.Upsert(ctx, dosaRenamed.All(), cte1)
Expand All @@ -475,7 +475,7 @@ func TestClient_Upsert_Errors(t *testing.T) {
func TestClient_RemoveRange(t *testing.T) {
reg1, _ := dosaRenamed.NewRegistrar(scope, namePrefix, cte1)

c1 := dosaRenamed.NewClient(reg1, nullConnector)
c1 := dosaRenamed.NewClient(reg1, nullConnector, nil)
rop := dosaRenamed.NewRemoveRangeOp(cte1).Eq("ID", "123")
err := c1.RemoveRange(ctx, rop)
assert.True(t, dosaRenamed.ErrorIsNotInitialized(err))
Expand All @@ -501,7 +501,7 @@ func TestClient_RemoveRange(t *testing.T) {
mockConn := mocks.NewMockConnector(ctrl)
mockConn.EXPECT().CheckSchema(ctx, gomock.Any(), gomock.Any(), gomock.Any()).Return(int32(1), nil).AnyTimes()
mockConn.EXPECT().RemoveRange(ctx, gomock.Any(), gomock.Any()).Return(nil)
c2 := dosaRenamed.NewClient(reg1, mockConn)
c2 := dosaRenamed.NewClient(reg1, mockConn, nil)
c2.Initialize(ctx)
rop = dosaRenamed.NewRemoveRangeOp(cte1)
err = c2.RemoveRange(ctx, rop)
Expand All @@ -518,7 +518,7 @@ func TestClient_Range(t *testing.T) {
}

// uninitialized
c1 := dosaRenamed.NewClient(reg1, nullConnector)
c1 := dosaRenamed.NewClient(reg1, nullConnector, nil)
rop := dosaRenamed.NewRangeOp(cte1).Fields(fieldsToRead).Eq("ID", "123").Offset("tokeytoketoke")
_, _, err := c1.Range(ctx, rop)
assert.True(t, dosaRenamed.ErrorIsNotInitialized(err))
Expand Down Expand Up @@ -553,7 +553,7 @@ func TestClient_Range(t *testing.T) {
mockConn.EXPECT().CheckSchema(ctx, gomock.Any(), gomock.Any(), gomock.Any()).Return(int32(1), nil).AnyTimes()
mockConn.EXPECT().Range(ctx, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return([]map[string]dosaRenamed.FieldValue{resultRow}, "continuation-token", nil)
c2 := dosaRenamed.NewClient(reg1, mockConn)
c2 := dosaRenamed.NewClient(reg1, mockConn, nil)
c2.Initialize(ctx)
rop = dosaRenamed.NewRangeOp(cte1)
rows, token, err := c2.Range(ctx, rop)
Expand Down Expand Up @@ -588,7 +588,7 @@ func TestClient_WalkRange(t *testing.T) {
}

// uninitialized
c0 := dosaRenamed.NewClient(reg1, nullConnector)
c0 := dosaRenamed.NewClient(reg1, nullConnector, nil)
rop := dosaRenamed.NewRangeOp(cte1).Fields(fieldsToRead).Eq("ID", "123").Offset("tokeytoketoke")
err := c0.WalkRange(ctx, rop, func(value dosaRenamed.DomainObject) error {
return nil
Expand All @@ -604,7 +604,7 @@ func TestClient_WalkRange(t *testing.T) {
Return([]map[string]dosaRenamed.FieldValue{resultRow0}, "token0", nil)
mockConn1.EXPECT().Range(ctx, gomock.Any(), gomock.Any(), gomock.Any(), "token0", gomock.Any()).
Return([]map[string]dosaRenamed.FieldValue{resultRow1}, "", nil)
c1 := dosaRenamed.NewClient(reg1, mockConn1)
c1 := dosaRenamed.NewClient(reg1, mockConn1, nil)
c1.Initialize(ctx)
rop = dosaRenamed.NewRangeOp(cte1)

Expand All @@ -628,7 +628,7 @@ func TestClient_WalkRange(t *testing.T) {
mockConn2.EXPECT().CheckSchema(ctx, gomock.Any(), gomock.Any(), gomock.Any()).Return(int32(1), nil)
mockConn2.EXPECT().Range(ctx, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return([]map[string]dosaRenamed.FieldValue{resultRow0}, "", nil)
c2 := dosaRenamed.NewClient(reg1, mockConn2)
c2 := dosaRenamed.NewClient(reg1, mockConn2, nil)
c2.Initialize(ctx)
rop = dosaRenamed.NewRangeOp(cte1)
err = c2.WalkRange(ctx, rop, func(value dosaRenamed.DomainObject) error {
Expand All @@ -648,7 +648,7 @@ func TestClient_ScanEverything(t *testing.T) {
}

// uninitialized
c1 := dosaRenamed.NewClient(reg1, nullConnector)
c1 := dosaRenamed.NewClient(reg1, nullConnector, nil)
sop := dosaRenamed.NewScanOp(cte1).Fields(fieldsToRead).Offset("tokeytoketoke")
_, _, err := c1.ScanEverything(ctx, sop)
assert.True(t, dosaRenamed.ErrorIsNotInitialized(err))
Expand All @@ -675,7 +675,7 @@ func TestClient_ScanEverything(t *testing.T) {
mockConn.EXPECT().CheckSchema(ctx, gomock.Any(), gomock.Any(), gomock.Any()).Return(int32(1), nil).AnyTimes()
mockConn.EXPECT().Scan(ctx, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return([]map[string]dosaRenamed.FieldValue{resultRow}, "continuation-token", nil)
c2 := dosaRenamed.NewClient(reg1, mockConn)
c2 := dosaRenamed.NewClient(reg1, mockConn, nil)
c2.Initialize(ctx)
sop = dosaRenamed.NewScanOp(cte1)
rows, token, err := c2.ScanEverything(ctx, sop)
Expand All @@ -699,7 +699,7 @@ func TestClient_Remove(t *testing.T) {
reg1, _ := dosaRenamed.NewRegistrar(scope, namePrefix, cte1)

// uninitialized
c1 := dosaRenamed.NewClient(reg1, nullConnector)
c1 := dosaRenamed.NewClient(reg1, nullConnector, nil)
err := c1.Remove(ctx, cte1)
assert.True(t, dosaRenamed.ErrorIsNotInitialized(err))

Expand All @@ -715,7 +715,7 @@ func TestClient_Remove(t *testing.T) {
mockConn := mocks.NewMockConnector(ctrl)
mockConn.EXPECT().CheckSchema(ctx, gomock.Any(), gomock.Any(), gomock.Any()).Return(int32(1), nil).AnyTimes()
mockConn.EXPECT().Remove(ctx, gomock.Any(), map[string]dosaRenamed.FieldValue{"id": dosaRenamed.FieldValue(int64(123))}).Return(nil)
c2 := dosaRenamed.NewClient(reg1, mockConn)
c2 := dosaRenamed.NewClient(reg1, mockConn, nil)
c2.Initialize(ctx)
err = c2.Remove(ctx, &ClientTestEntity1{ID: int64(123)})
assert.NoError(t, err)
Expand All @@ -742,7 +742,7 @@ func TestClient_MultiRead(t *testing.T) {
}

// uninitialized
c1 := dosaRenamed.NewClient(reg1, nullConnector)
c1 := dosaRenamed.NewClient(reg1, nullConnector, nil)
assert.Error(t, c1.Read(ctx, fieldsToRead, cte1))

// unregistered object
Expand All @@ -769,7 +769,7 @@ func TestClient_MultiRead(t *testing.T) {
assert.Equal(t, columnsToRead, []string{"id", "email"})

}).Return(results, nil).MinTimes(1)
c3 := dosaRenamed.NewClient(reg2, mockConn)
c3 := dosaRenamed.NewClient(reg2, mockConn, nil)
assert.NoError(t, c3.Initialize(ctx))
rs, err := c3.MultiRead(ctx, fieldsToRead, e1, e2)
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit 6c5c2de

Please sign in to comment.