Skip to content

Commit

Permalink
Add memory connector (#147)
Browse files Browse the repository at this point in the history
* Add memory connector

The memory connector can be used for testing, or when you just need an in-memory
datastore. This implementation is fairly efficient, mimicking Cassandra's handling of
ordering.

This change also modifies the contract of the connectors: they now may return more data
than originally requested, and expect the callers not to mutate the results. This is safe since
really the only caller should be the Client interface implementation (or tests).

TODO:
Range and Scan always return all the rows, ignoring the specified limit
As a result, you don't get any continuation tokens
  • Loading branch information
rkuris authored May 24, 2017
1 parent 5cb107b commit 1c234ae
Show file tree
Hide file tree
Showing 17 changed files with 1,340 additions and 118 deletions.
10 changes: 5 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (c *client) Read(ctx context.Context, fieldsToRead []string, entity DomainO
}

// map results to entity fields
re.SetFieldValues(entity, results)
re.SetFieldValues(entity, results, columnsToRead)

return nil
}
Expand Down Expand Up @@ -390,19 +390,19 @@ func (c *client) Range(ctx context.Context, r *RangeOp) ([]DomainObject, string,
return nil, "", errors.Wrap(err, "Range")
}

objectArray := objectsFromValueArray(r.sop.object, values, re)
objectArray := objectsFromValueArray(r.sop.object, values, re, nil)
return objectArray, token, nil
}

func objectsFromValueArray(object DomainObject, values []map[string]FieldValue, re *RegisteredEntity) []DomainObject {
func objectsFromValueArray(object DomainObject, values []map[string]FieldValue, re *RegisteredEntity, columnsToRead []string) []DomainObject {
goType := reflect.TypeOf(object).Elem() // get the reflect.Type of the client entity
doType := reflect.TypeOf((*DomainObject)(nil)).Elem()
slice := reflect.MakeSlice(reflect.SliceOf(doType), 0, len(values)) // make a slice of these
elements := reflect.New(slice.Type())
elements.Elem().Set(slice)
for _, flist := range values { // for each row returned
newObject := reflect.New(goType).Interface() // make a new entity
re.SetFieldValues(newObject.(DomainObject), flist) // fill it in from server values
re.SetFieldValues(newObject.(DomainObject), flist, columnsToRead) // fill it in from server values
slice = reflect.Append(slice, reflect.ValueOf(newObject.(DomainObject))) // append to slice
}
return slice.Interface().([]DomainObject)
Expand Down Expand Up @@ -434,7 +434,7 @@ func (c *client) ScanEverything(ctx context.Context, sop *ScanOp) ([]DomainObjec
if err != nil {
return nil, "", err
}
objectArray := objectsFromValueArray(sop.object, values, re)
objectArray := objectsFromValueArray(sop.object, values, re, nil)
return objectArray, token, nil

}
Expand Down
76 changes: 70 additions & 6 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"fmt"

dosaRenamed "github.com/uber-go/dosa"
"github.com/uber-go/dosa/connectors/devnull"
_ "github.com/uber-go/dosa/connectors/devnull"
_ "github.com/uber-go/dosa/connectors/memory"
"github.com/uber-go/dosa/mocks"
)

Expand All @@ -57,27 +60,88 @@ var (
ctx = context.TODO()
scope = "test"
namePrefix = "team.service"
nullConnector = &devnull.Connector{}
nullConnector dosaRenamed.Connector
)

func init() {
nullConnector, _ = dosaRenamed.GetConnector("devnull", nil)
}

// ExampleNewClient initializes a client using the devnull connector, which discards all
// the data you send it and always returns no rows. It's only useful for testing dosa.
func ExampleNewClient() {
// initialize registrar
reg, err := dosaRenamed.NewRegistrar("test", "myteam.myservice", cte1)
if err != nil {
// registration will fail if the object is tagged incorrectly
panic("dosaRenamed.NewRegister returned an error")
fmt.Printf("NewRegistrar error: %s", err)
return
}

// use a devnull connector for example purposes
conn := &devnull.Connector{}
conn, err := dosaRenamed.GetConnector("devnull", nil)
if err != nil {
fmt.Printf("GetConnector error: %s", err)
return
}

// create the client using the registry and connector
client := dosaRenamed.NewClient(reg, conn)

err = client.Initialize(context.Background())
if err != nil {
errors.Wrap(err, "client.Initialize returned an error")
fmt.Printf("Initialize error: %s", err)
return
}
}

// ExampleGetConnector gets an in-memory connector that can be used for testing your code.
// The in-memory connector always starts off with no rows, so you'll need to add rows to
// your "database" before reading them
func ExampleGetConnector() {
// register your entities so the engine can separate your data based on table names.
// Scopes and prefixes are not used by the in-memory connector, and are ignored, but
// your list of entities is important. In this case, we only have one, our ClientTestEntity1
reg, err := dosaRenamed.NewRegistrar("test", "myteam.myservice", &ClientTestEntity1{})
if err != nil {
fmt.Printf("NewRegistrar error: %s", err)
return
}

// Find the memory connector. There is no configuration information so pass a nil
// For this to work, you must force the init method of memory to run first, which happens
// when we imported memory in the import list, with an underscore to just get the side effects
conn, _ := dosaRenamed.GetConnector("memory", nil)

// now construct a client from the registry and the connector
client := dosaRenamed.NewClient(reg, conn)

// initialize the client; this should always work for the in-memory connector
if err = client.Initialize(context.Background()); err != nil {
fmt.Printf("Initialize error: %s", err)
return
}

// now populate an entity and insert it into the memory store
if err := client.CreateIfNotExists(context.Background(), &ClientTestEntity1{
ID: int64(1),
Name: "rkuris",
Email: "[email protected]"}); err != nil {
fmt.Printf("CreateIfNotExists error: %s", err)
return
}

// create an entity to hold the read result, just populate the key
e := ClientTestEntity1{ID: int64(1)}
// now read the data from the "database", all columns
err = client.Read(context.Background(), dosaRenamed.All(), &e)
if err != nil {
fmt.Printf("Read error: %s", err)
return
}
// great! It worked, so display the information we stored earlier
fmt.Printf("id:%d Name:%q Email:%q\n", e.ID, e.Name, e.Email)
// Output: id:1 Name:"rkuris" Email:"[email protected]"
}

func TestNewClient(t *testing.T) {
Expand Down Expand Up @@ -151,7 +215,7 @@ func TestClient_Read(t *testing.T) {
assert.NoError(t, c3.Initialize(ctx))
assert.NoError(t, c3.Read(ctx, fieldsToRead, cte1))
assert.Equal(t, cte1.ID, results["id"])
assert.Equal(t, cte1.Name, results["name"])
assert.NotEqual(t, cte1.Name, results["name"])
assert.Equal(t, cte1.Email, results["email"])
}

Expand Down
23 changes: 13 additions & 10 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,19 @@ type SchemaStatus struct {

// Connector is the interface that must be implemented for a backend service
// It can also be implemented using an RPC such as thrift (dosa-idl)
// When fields are returned from read/range/search/scan methods, it's legal for the connector
// to return more fields than originally requested. The caller of the connector should never mutate
// the returned columns either, in case they are from a cache
type Connector interface {
// DML operations (CRUD + search)
// CreateIfNotExists creates a row, but only if it does not exist.
CreateIfNotExists(ctx context.Context, ei *EntityInfo, values map[string]FieldValue) error
// Read fetches a row by primary key
// If fieldsToRead is empty or nil, all non-key fields would be fetched.
Read(ctx context.Context, ei *EntityInfo, keys map[string]FieldValue, fieldsToRead []string) (values map[string]FieldValue, err error)
// If minimumFields is empty or nil, all non-key fields would be fetched.
Read(ctx context.Context, ei *EntityInfo, keys map[string]FieldValue, minimumFields []string) (values map[string]FieldValue, err error)
// MultiRead fetches several rows by primary key
// If fieldsToRead is empty or nil, all non-key fields would be fetched.
MultiRead(ctx context.Context, ei *EntityInfo, keys []map[string]FieldValue, fieldsToRead []string) (results []*FieldValuesOrError, err error)
// If minimumFields is empty or nil, all non-key fields would be fetched.
MultiRead(ctx context.Context, ei *EntityInfo, keys []map[string]FieldValue, minimumFields []string) (results []*FieldValuesOrError, err error)
// Upsert updates some columns of a row, or creates a new one if it doesn't exist yet.
Upsert(ctx context.Context, ei *EntityInfo, values map[string]FieldValue) error
// MultiUpsert updates some columns of several rows, or creates a new ones if they doesn't exist yet
Expand All @@ -111,14 +114,14 @@ type Connector interface {
// MultiRemove removes multiple rows
MultiRemove(ctx context.Context, ei *EntityInfo, multiKeys []map[string]FieldValue) (result []error, err error)
// Range does a range scan using a set of conditions.
// If fieldsToRead is empty or nil, all fields (including key fields) would be fetched.
Range(ctx context.Context, ei *EntityInfo, columnConditions map[string][]*Condition, fieldsToRead []string, token string, limit int) ([]map[string]FieldValue, string, error)
// If minimumFields is empty or nil, all fields (including key fields) would be fetched.
Range(ctx context.Context, ei *EntityInfo, columnConditions map[string][]*Condition, minimumFields []string, token string, limit int) ([]map[string]FieldValue, string, error)
// Search does a search against a field marked 'searchable'
// If fieldsToRead is empty or nil, all fields (including key fields) would be fetched.
Search(ctx context.Context, ei *EntityInfo, fieldPairs FieldNameValuePair, fieldsToRead []string, token string, limit int) (multiValues []map[string]FieldValue, nextToken string, err error)
// If minimumFields is empty or nil, all fields (including key fields) would be fetched.
Search(ctx context.Context, ei *EntityInfo, fieldPairs FieldNameValuePair, minimumFields []string, token string, limit int) (multiValues []map[string]FieldValue, nextToken string, err error)
// Scan reads the whole table, for doing a sequential search or dump/load use cases
// If fieldsToRead is empty or nil, all fields (including key fields) would be fetched.
Scan(ctx context.Context, ei *EntityInfo, fieldsToRead []string, token string, limit int) (multiValues []map[string]FieldValue, nextToken string, err error)
// If minimumFields is empty or nil, all fields (including key fields) would be fetched.
Scan(ctx context.Context, ei *EntityInfo, minimumFields []string, token string, limit int) (multiValues []map[string]FieldValue, nextToken string, err error)

// DDL operations (schema)
// CheckSchema validates that the set of entities you have provided is valid and registered already
Expand Down
20 changes: 10 additions & 10 deletions connectors/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,19 @@ func (c *Connector) CreateIfNotExists(ctx context.Context, ei *dosa.EntityInfo,
}

// Read calls Next
func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue, fieldsToRead []string) (map[string]dosa.FieldValue, error) {
func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue, minimumFields []string) (map[string]dosa.FieldValue, error) {
if c.Next == nil {
return nil, ErrNoMoreConnector{}
}
return c.Next.Read(ctx, ei, values, fieldsToRead)
return c.Next.Read(ctx, ei, values, minimumFields)
}

// MultiRead calls Next
func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, values []map[string]dosa.FieldValue, fieldsToRead []string) ([]*dosa.FieldValuesOrError, error) {
func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, values []map[string]dosa.FieldValue, minimumFields []string) ([]*dosa.FieldValuesOrError, error) {
if c.Next == nil {
return nil, ErrNoMoreConnector{}
}
return c.Next.MultiRead(ctx, ei, values, fieldsToRead)
return c.Next.MultiRead(ctx, ei, values, minimumFields)
}

// Upsert calls Next
Expand Down Expand Up @@ -102,27 +102,27 @@ func (c *Connector) MultiRemove(ctx context.Context, ei *dosa.EntityInfo, multiV
}

// Range calls Next
func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
if c.Next == nil {
return nil, "", ErrNoMoreConnector{}
}
return c.Next.Range(ctx, ei, columnConditions, fieldsToRead, token, limit)
return c.Next.Range(ctx, ei, columnConditions, minimumFields, token, limit)
}

// Search calls Next
func (c *Connector) Search(ctx context.Context, ei *dosa.EntityInfo, fieldPairs dosa.FieldNameValuePair, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
func (c *Connector) Search(ctx context.Context, ei *dosa.EntityInfo, fieldPairs dosa.FieldNameValuePair, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
if c.Next == nil {
return nil, "", ErrNoMoreConnector{}
}
return c.Next.Search(ctx, ei, fieldPairs, fieldsToRead, token, limit)
return c.Next.Search(ctx, ei, fieldPairs, minimumFields, token, limit)
}

// Scan calls Next
func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
if c.Next == nil {
return nil, "", ErrNoMoreConnector{}
}
return c.Next.Scan(ctx, ei, fieldsToRead, token, limit)
return c.Next.Scan(ctx, ei, minimumFields, token, limit)
}

// CheckSchema calls Next
Expand Down
30 changes: 15 additions & 15 deletions connectors/base/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,21 @@ func TestBase_CreateIfNotExists(t *testing.T) {
}

func TestBase_Read(t *testing.T) {
fieldsToRead := make([]string, 1)
_, err := bc.Read(ctx, testInfo, testValues, fieldsToRead)
minimumFields := make([]string, 1)
_, err := bc.Read(ctx, testInfo, testValues, minimumFields)
assert.Error(t, err)

val, err := bcWNext.Read(ctx, testInfo, testValues, fieldsToRead)
val, err := bcWNext.Read(ctx, testInfo, testValues, minimumFields)
assert.Nil(t, val)
assert.Error(t, err)
}

func TestBase_MultiRead(t *testing.T) {
fieldsToRead := make([]string, 1)
_, e := bc.MultiRead(ctx, testInfo, testMultiValues, fieldsToRead)
minimumFields := make([]string, 1)
_, e := bc.MultiRead(ctx, testInfo, testMultiValues, minimumFields)
assert.Error(t, e)

v, e := bcWNext.MultiRead(ctx, testInfo, testMultiValues, fieldsToRead)
v, e := bcWNext.MultiRead(ctx, testInfo, testMultiValues, minimumFields)
assert.NotNil(t, v)
assert.Nil(t, e)
}
Expand Down Expand Up @@ -113,31 +113,31 @@ func TestBase_MultiRemove(t *testing.T) {

func TestBase_Range(t *testing.T) {
conditions := make(map[string][]*dosa.Condition)
fieldsToRead := make([]string, 1)
_, _, err := bc.Range(ctx, testInfo, conditions, fieldsToRead, "", 0)
minimumFields := make([]string, 1)
_, _, err := bc.Range(ctx, testInfo, conditions, minimumFields, "", 0)
assert.Error(t, err)

vals, _, err := bcWNext.Range(ctx, testInfo, conditions, fieldsToRead, "", 0)
vals, _, err := bcWNext.Range(ctx, testInfo, conditions, minimumFields, "", 0)
assert.Nil(t, vals)
assert.Error(t, err)
}

func TestBase_Search(t *testing.T) {
fieldsToRead := make([]string, 1)
_, _, err := bc.Search(ctx, testInfo, testPairs, fieldsToRead, "", 0)
minimumFields := make([]string, 1)
_, _, err := bc.Search(ctx, testInfo, testPairs, minimumFields, "", 0)
assert.Error(t, err)

vals, _, err := bcWNext.Search(ctx, testInfo, testPairs, fieldsToRead, "", 0)
vals, _, err := bcWNext.Search(ctx, testInfo, testPairs, minimumFields, "", 0)
assert.Nil(t, vals)
assert.Error(t, err)
}

func TestBase_Scan(t *testing.T) {
fieldsToRead := make([]string, 1)
_, _, err := bc.Scan(ctx, testInfo, fieldsToRead, "", 0)
minimumFields := make([]string, 1)
_, _, err := bc.Scan(ctx, testInfo, minimumFields, "", 0)
assert.Error(t, err)

vals, _, err := bcWNext.Scan(ctx, testInfo, fieldsToRead, "", 0)
vals, _, err := bcWNext.Scan(ctx, testInfo, minimumFields, "", 0)
assert.Nil(t, vals)
assert.Error(t, err)
}
Expand Down
10 changes: 5 additions & 5 deletions connectors/devnull/devnull.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ func (c *Connector) CreateIfNotExists(ctx context.Context, ei *dosa.EntityInfo,
}

// Read always returns a not found error
func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue, fieldsToRead []string) (map[string]dosa.FieldValue, error) {
func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue, minimumFields []string) (map[string]dosa.FieldValue, error) {
return nil, &dosa.ErrNotFound{}
}

// MultiRead returns a set of not found errors for each key you specify
func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, values []map[string]dosa.FieldValue, fieldsToRead []string) ([]*dosa.FieldValuesOrError, error) {
func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, values []map[string]dosa.FieldValue, minimumFields []string) ([]*dosa.FieldValuesOrError, error) {
errors := make([]*dosa.FieldValuesOrError, len(values))
for inx := range values {
errors[inx] = &dosa.FieldValuesOrError{Error: &dosa.ErrNotFound{}}
Expand Down Expand Up @@ -78,17 +78,17 @@ func (c *Connector) MultiRemove(ctx context.Context, ei *dosa.EntityInfo, multiV
}

// Range is not yet implementedS
func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
return nil, "", &dosa.ErrNotFound{}
}

// Search is not yet implemented
func (c *Connector) Search(ctx context.Context, ei *dosa.EntityInfo, fieldPairs dosa.FieldNameValuePair, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
func (c *Connector) Search(ctx context.Context, ei *dosa.EntityInfo, fieldPairs dosa.FieldNameValuePair, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
return nil, "", &dosa.ErrNotFound{}
}

// Scan is not yet implemented
func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
return nil, "", &dosa.ErrNotFound{}
}

Expand Down
Loading

0 comments on commit 1c234ae

Please sign in to comment.