diff --git a/.github/infrastructure/docker-compose-cockroachdb.yml b/.github/infrastructure/docker-compose-cockroachdb.yml new file mode 100644 index 0000000000..2fa6bff9e4 --- /dev/null +++ b/.github/infrastructure/docker-compose-cockroachdb.yml @@ -0,0 +1,19 @@ +version: '2' +services: + cockroachdb: + image: cockroachdb/cockroach:v21.2.3 + hostname: cockroachdb + command: start-single-node --cluster-name=single-node --logtostderr=WARNING --log-file-verbosity=WARNING --insecure + restart: always + ports: + - "26257:26257" + + cockroachdb-init: + hostname: cockroachdb-init + image: timveil/cockroachdb-remote-client:latest + environment: + - COCKROACH_HOST=cockroachdb:26257 + - COCKROACH_INSECURE=true + - DATABASE_NAME=dapr_test + depends_on: + - cockroachdb \ No newline at end of file diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml index babd043ea4..07b413b199 100644 --- a/.github/workflows/conformance.yml +++ b/.github/workflows/conformance.yml @@ -79,6 +79,7 @@ jobs: - state.postgresql - state.redis - state.sqlserver + - state.cockroachdb EOF ) echo "::set-output name=pr-components::$PR_COMPONENTS" @@ -301,6 +302,11 @@ jobs: docker-compose -f ./.github/infrastructure/docker-compose-cassandra.yml -p cassandra up -d if: contains(matrix.component, 'cassandra') + - name: Start cockroachdb + run: | + docker-compose -f ./.github/infrastructure/docker-compose-cockroachdb.yml -p cockroachdb up -d + if: contains(matrix.component, 'cockroachdb') + - name: Setup KinD test data if: contains(matrix.component, 'kubernetes') run: | diff --git a/state/cockroachdb/cockroachdb.go b/state/cockroachdb/cockroachdb.go new file mode 100644 index 0000000000..189a8e17da --- /dev/null +++ b/state/cockroachdb/cockroachdb.go @@ -0,0 +1,108 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cockroachdb + +import ( + "github.com/dapr/components-contrib/state" + "github.com/dapr/kit/logger" +) + +// CockroachDB state store. +type CockroachDB struct { + features []state.Feature + logger logger.Logger + dbaccess dbAccess +} + +// New creates a new instance of CockroachDB state store. +func New(logger logger.Logger) *CockroachDB { + dba := newCockroachDBAccess(logger) + + return internalNew(logger, dba) +} + +// internalNew creates a new instance of a CockroachDB state store. +// This unexported constructor allows injecting a dbAccess instance for unit testing. +func internalNew(logger logger.Logger, dba dbAccess) *CockroachDB { + return &CockroachDB{ + features: []state.Feature{state.FeatureETag, state.FeatureTransactional}, + logger: logger, + dbaccess: dba, + } +} + +// Init initializes the CockroachDB state store. +func (c *CockroachDB) Init(metadata state.Metadata) error { + return c.dbaccess.Init(metadata) +} + +// Features returns the features available in this state store. +func (c *CockroachDB) Features() []state.Feature { + return c.features +} + +// Delete removes an entity from the store. +func (c *CockroachDB) Delete(req *state.DeleteRequest) error { + return c.dbaccess.Delete(req) +} + +// Get returns an entity from store. +func (c *CockroachDB) Get(req *state.GetRequest) (*state.GetResponse, error) { + return c.dbaccess.Get(req) +} + +// Set adds/updates an entity on store. +func (c *CockroachDB) Set(req *state.SetRequest) error { + return c.dbaccess.Set(req) +} + +// Ping checks if database is available. +func (c *CockroachDB) Ping() error { + return c.dbaccess.Ping() +} + +// BulkDelete removes multiple entries from the store. +func (c *CockroachDB) BulkDelete(req []state.DeleteRequest) error { + return c.dbaccess.BulkDelete(req) +} + +// BulkGet performs a bulks get operations. +func (c *CockroachDB) BulkGet(req []state.GetRequest) (bool, []state.BulkGetResponse, error) { + // TODO: replace with ExecuteMulti for performance. + return false, nil, nil +} + +// BulkSet adds/updates multiple entities on store. +func (c *CockroachDB) BulkSet(req []state.SetRequest) error { + return c.dbaccess.BulkSet(req) +} + +// Multi handles multiple transactions. Implements TransactionalStore. +func (c *CockroachDB) Multi(request *state.TransactionalStateRequest) error { + return c.dbaccess.ExecuteMulti(request) +} + +// Query executes a query against store. +func (c *CockroachDB) Query(req *state.QueryRequest) (*state.QueryResponse, error) { + return c.dbaccess.Query(req) +} + +// Close implements io.Closer. +func (c *CockroachDB) Close() error { + if c.dbaccess != nil { + return c.dbaccess.Close() + } + + return nil +} diff --git a/state/cockroachdb/cockroachdb_access.go b/state/cockroachdb/cockroachdb_access.go new file mode 100644 index 0000000000..36d1e0f9fc --- /dev/null +++ b/state/cockroachdb/cockroachdb_access.go @@ -0,0 +1,476 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cockroachdb + +import ( + "database/sql" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "strconv" + + "github.com/agrea/ptr" + + "github.com/dapr/components-contrib/state" + "github.com/dapr/components-contrib/state/query" + "github.com/dapr/components-contrib/state/utils" + "github.com/dapr/kit/logger" + + // Blank import for the underlying PostgreSQL driver. + _ "github.com/jackc/pgx/v4/stdlib" +) + +const ( + connectionStringKey = "connectionString" + errMissingConnectionString = "missing connection string" + tableName = "state" +) + +// cockroachDBAccess implements dbaccess. +type cockroachDBAccess struct { + logger logger.Logger + metadata state.Metadata + db *sql.DB + connectionString string +} + +// newCockroachDBAccess creates a new instance of cockroachDBAccess. +func newCockroachDBAccess(logger logger.Logger) *cockroachDBAccess { + logger.Debug("Instantiating new CockroachDB state store") + + return &cockroachDBAccess{ + logger: logger, + metadata: state.Metadata{ + Properties: map[string]string{}, + }, + db: nil, + connectionString: "", + } +} + +// Init sets up CockroachDB connection and ensures that the state table exists. +func (p *cockroachDBAccess) Init(metadata state.Metadata) error { + p.logger.Debug("Initializing CockroachDB state store") + + p.metadata = metadata + + if val, ok := metadata.Properties[connectionStringKey]; ok && val != "" { + p.connectionString = val + } else { + p.logger.Error("Missing CockroachDB connection string") + + return fmt.Errorf(errMissingConnectionString) + } + + databaseConn, err := sql.Open("pgx", p.connectionString) + if err != nil { + p.logger.Error(err) + + return err + } + + p.db = databaseConn + + if err = databaseConn.Ping(); err != nil { + return err + } + + if err = p.ensureStateTable(tableName); err != nil { + return err + } + + return nil +} + +// Set makes an insert or update to the database. +func (p *cockroachDBAccess) Set(req *state.SetRequest) error { + return state.SetWithOptions(p.setValue, req) +} + +// setValue is an internal implementation of set to enable passing the logic to state.SetWithRetries as a func. +func (p *cockroachDBAccess) setValue(req *state.SetRequest) error { + p.logger.Debug("Setting state value in CockroachDB") + + value, isBinary, err := validateAndReturnValue(req) + if err != nil { + return err + } + + var result sql.Result + + // Sprintf is required for table name because sql.DB does not substitute parameters for table names. + // Other parameters use sql.DB parameter substitution. + if req.ETag == nil { + result, err = p.db.Exec(fmt.Sprintf( + `INSERT INTO %s (key, value, isbinary, etag) VALUES ($1, $2, $3, 1) + ON CONFLICT (key) DO UPDATE SET value = $2, isbinary = $3, updatedate = NOW(), etag = EXCLUDED.etag + 1;`, + tableName), req.Key, value, isBinary) + } else { + var etag64 uint64 + etag64, err = strconv.ParseUint(*req.ETag, 10, 32) + if err != nil { + return state.NewETagError(state.ETagInvalid, err) + } + etag := uint32(etag64) + + // When an etag is provided do an update - no insert. + result, err = p.db.Exec(fmt.Sprintf( + `UPDATE %s SET value = $1, isbinary = $2, updatedate = NOW(), etag = etag + 1 + WHERE key = $3 AND etag = $4;`, + tableName), value, isBinary, req.Key, etag) + } + + if err != nil { + return err + } + + rows, err := result.RowsAffected() + if err != nil { + return err + } + + if rows != 1 { + return fmt.Errorf("no item was updated") + } + + return nil +} + +func (p *cockroachDBAccess) BulkSet(req []state.SetRequest) error { + p.logger.Debug("Executing BulkSet request") + tx, err := p.db.Begin() + if err != nil { + return err + } + + if len(req) > 0 { + for _, s := range req { + sa := s // Fix for gosec G601: Implicit memory aliasing in for loop. + err = p.Set(&sa) + if err != nil { + tx.Rollback() + + return err + } + } + } + + err = tx.Commit() + + return err +} + +// Get returns data from the database. If data does not exist for the key an empty state.GetResponse will be returned. +func (p *cockroachDBAccess) Get(req *state.GetRequest) (*state.GetResponse, error) { + p.logger.Debug("Getting state value from CockroachDB") + if req.Key == "" { + return nil, fmt.Errorf("missing key in get operation") + } + + var value string + var isBinary bool + var etag int + err := p.db.QueryRow(fmt.Sprintf("SELECT value, isbinary, etag FROM %s WHERE key = $1", tableName), req.Key).Scan(&value, &isBinary, &etag) + if err != nil { + // If no rows exist, return an empty response, otherwise return the error. + if errors.Is(err, sql.ErrNoRows) { + return &state.GetResponse{}, nil + } + + return nil, err + } + + if isBinary { + var dataS string + var data []byte + + if err = json.Unmarshal([]byte(value), &dataS); err != nil { + return nil, err + } + + if data, err = base64.StdEncoding.DecodeString(dataS); err != nil { + return nil, err + } + + return &state.GetResponse{ + Data: data, + ETag: ptr.String(strconv.Itoa(etag)), + Metadata: req.Metadata, + ContentType: nil, + }, nil + } + + return &state.GetResponse{ + Data: []byte(value), + ETag: ptr.String(strconv.Itoa(etag)), + Metadata: req.Metadata, + ContentType: nil, + }, nil +} + +// Delete removes an item from the state store. +func (p *cockroachDBAccess) Delete(req *state.DeleteRequest) error { + return state.DeleteWithOptions(p.deleteValue, req) +} + +// deleteValue is an internal implementation of delete to enable passing the logic to state.DeleteWithRetries as a func. +func (p *cockroachDBAccess) deleteValue(req *state.DeleteRequest) error { + p.logger.Debug("Deleting state value from CockroachDB") + if req.Key == "" { + return fmt.Errorf("missing key in delete operation") + } + + var result sql.Result + var err error + + if req.ETag == nil { + result, err = p.db.Exec("DELETE FROM state WHERE key = $1", req.Key) + } else { + var etag64 uint64 + etag64, err = strconv.ParseUint(*req.ETag, 10, 32) + if err != nil { + return state.NewETagError(state.ETagInvalid, err) + } + etag := uint32(etag64) + + result, err = p.db.Exec("DELETE FROM state WHERE key = $1 and etag = $2", req.Key, etag) + } + + if err != nil { + return err + } + + rows, err := result.RowsAffected() + if err != nil { + return err + } + + if rows != 1 && req.ETag != nil && *req.ETag != "" { + return state.NewETagError(state.ETagMismatch, nil) + } + + return nil +} + +func (p *cockroachDBAccess) BulkDelete(req []state.DeleteRequest) error { + p.logger.Debug("Executing BulkDelete request") + tx, err := p.db.Begin() + if err != nil { + return err + } + + if len(req) > 0 { + for _, d := range req { + da := d // Fix for gosec G601: Implicit memory aliasing in for loop. + err = p.Delete(&da) + if err != nil { + tx.Rollback() + + return err + } + } + } + + err = tx.Commit() + + return err +} + +func (p *cockroachDBAccess) ExecuteMulti(request *state.TransactionalStateRequest) error { + p.logger.Debug("Executing PostgreSQL transaction") + + tx, err := p.db.Begin() + if err != nil { + return err + } + + for _, o := range request.Operations { + switch o.Operation { + case state.Upsert: + var setReq state.SetRequest + + setReq, err = getSet(o) + if err != nil { + tx.Rollback() + return err + } + + err = p.Set(&setReq) + if err != nil { + tx.Rollback() + return err + } + + case state.Delete: + var delReq state.DeleteRequest + + delReq, err = getDelete(o) + if err != nil { + tx.Rollback() + return err + } + + err = p.Delete(&delReq) + if err != nil { + tx.Rollback() + return err + } + + default: + tx.Rollback() + return fmt.Errorf("unsupported operation: %s", o.Operation) + } + } + + err = tx.Commit() + + return err +} + +// Query executes a query against store. +func (p *cockroachDBAccess) Query(req *state.QueryRequest) (*state.QueryResponse, error) { + p.logger.Debug("Getting query value from CockroachDB") + + stateQuery := &Query{ + query: "", + params: []interface{}{}, + limit: 0, + skip: ptr.Int64(0), + } + qbuilder := query.NewQueryBuilder(stateQuery) + if err := qbuilder.BuildQuery(&req.Query); err != nil { + return &state.QueryResponse{ + Results: []state.QueryItem{}, + Token: "", + Metadata: map[string]string{}, + }, err + } + + p.logger.Debug("Query: " + stateQuery.query) + + data, token, err := stateQuery.execute(p.logger, p.db) + if err != nil { + return &state.QueryResponse{ + Results: []state.QueryItem{}, + Token: "", + Metadata: map[string]string{}, + }, err + } + + return &state.QueryResponse{ + Results: data, + Token: token, + Metadata: map[string]string{}, + }, nil +} + +// Ping implements database ping. +func (p *cockroachDBAccess) Ping() error { + return p.db.Ping() +} + +// Close implements io.Close. +func (p *cockroachDBAccess) Close() error { + if p.db != nil { + return p.db.Close() + } + + return nil +} + +func (p *cockroachDBAccess) ensureStateTable(stateTableName string) error { + exists, err := tableExists(p.db, stateTableName) + if err != nil { + return err + } + + if !exists { + p.logger.Info("Creating CockroachDB state table") + createTable := fmt.Sprintf(`CREATE TABLE %s ( + key text NOT NULL PRIMARY KEY, + value jsonb NOT NULL, + isbinary boolean NOT NULL, + etag INT, + insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updatedate TIMESTAMP WITH TIME ZONE NULL);`, stateTableName) + _, err = p.db.Exec(createTable) + if err != nil { + return err + } + } + + return nil +} + +func tableExists(db *sql.DB, tableName string) (bool, error) { + exists := false + err := db.QueryRow("SELECT EXISTS (SELECT * FROM pg_tables where tablename = $1)", tableName).Scan(&exists) + + return exists, err +} + +func validateAndReturnValue(request *state.SetRequest) (value string, isBinary bool, err error) { + err = state.CheckRequestOptions(request.Options) + if err != nil { + return "", false, err + } + + if request.Key == "" { + return "", false, fmt.Errorf("missing key in set operation") + } + + if v, ok := request.Value.(string); ok && v == "" { + return "", false, fmt.Errorf("empty string is not allowed in set operation") + } + + requestValue := request.Value + byteArray, isBinary := request.Value.([]uint8) + if isBinary { + requestValue = base64.StdEncoding.EncodeToString(byteArray) + } + + // Convert to json string. + bt, _ := utils.Marshal(requestValue, json.Marshal) + + return string(bt), isBinary, nil +} + +// Returns the set requests. +func getSet(req state.TransactionalStateOperation) (state.SetRequest, error) { + setReq, ok := req.Request.(state.SetRequest) + if !ok { + return setReq, fmt.Errorf("expecting set request") + } + + if setReq.Key == "" { + return setReq, fmt.Errorf("missing key in upsert operation") + } + + return setReq, nil +} + +// Returns the delete requests. +func getDelete(req state.TransactionalStateOperation) (state.DeleteRequest, error) { + delReq, ok := req.Request.(state.DeleteRequest) + if !ok { + return delReq, fmt.Errorf("expecting delete request") + } + + if delReq.Key == "" { + return delReq, fmt.Errorf("missing key in delete operation") + } + + return delReq, nil +} diff --git a/state/cockroachdb/cockroachdb_access_test.go b/state/cockroachdb/cockroachdb_access_test.go new file mode 100644 index 0000000000..e59fb8c861 --- /dev/null +++ b/state/cockroachdb/cockroachdb_access_test.go @@ -0,0 +1,462 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cockroachdb + +import ( + "database/sql" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + + "github.com/dapr/components-contrib/state" + "github.com/dapr/kit/logger" +) + +type mocks struct { + db *sql.DB + mock sqlmock.Sqlmock + roachDba *cockroachDBAccess +} + +func TestGetSetWithWrongType(t *testing.T) { + t.Parallel() + operation := state.TransactionalStateOperation{ + Operation: state.Delete, + Request: state.DeleteRequest{}, // Delete request is not valid for getSets + } + + _, err := getSet(operation) + assert.NotNil(t, err) +} + +func TestGetSetWithNoKey(t *testing.T) { + t.Parallel() + operation := state.TransactionalStateOperation{ + Operation: state.Upsert, + Request: state.SetRequest{Value: "value1"}, // Set request with no key is invalid + } + + _, err := getSet(operation) + assert.NotNil(t, err) +} + +func TestGetSetValid(t *testing.T) { + t.Parallel() + operation := state.TransactionalStateOperation{ + Operation: state.Upsert, + Request: state.SetRequest{Key: "key1", Value: "value1"}, + } + + set, err := getSet(operation) + assert.Nil(t, err) + assert.Equal(t, "key1", set.Key) +} + +func TestGetDeleteWithWrongType(t *testing.T) { + t.Parallel() + operation := state.TransactionalStateOperation{ + Operation: state.Upsert, + Request: state.SetRequest{Value: "value1"}, // Set request is not valid for getDeletes + } + + _, err := getDelete(operation) + assert.NotNil(t, err) +} + +func TestGetDeleteWithNoKey(t *testing.T) { + t.Parallel() + operation := state.TransactionalStateOperation{ + Operation: state.Delete, + Request: state.DeleteRequest{}, // Delete request with no key is invalid + } + + _, err := getDelete(operation) + assert.NotNil(t, err) +} + +func TestGetDeleteValid(t *testing.T) { + t.Parallel() + operation := state.TransactionalStateOperation{ + Operation: state.Delete, + Request: state.DeleteRequest{Key: "key1"}, + } + + delete, err := getDelete(operation) + assert.Nil(t, err) + assert.Equal(t, "key1", delete.Key) +} + +func TestMultiWithNoRequests(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectCommit() + + var operations []state.TransactionalStateOperation + + // Act + err := m.roachDba.ExecuteMulti(&state.TransactionalStateRequest{ + Operations: operations, + }) + + // Assert + assert.Nil(t, err) +} + +func TestInvalidMultiInvalidAction(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectRollback() + + var operations []state.TransactionalStateOperation + + operations = append(operations, state.TransactionalStateOperation{ + Operation: "Something invalid", + Request: createSetRequest(), + }) + + // Act + err := m.roachDba.ExecuteMulti(&state.TransactionalStateRequest{ + Operations: operations, + }) + + // Assert + assert.NotNil(t, err) +} + +func TestValidSetRequest(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectExec("INSERT INTO").WillReturnResult(sqlmock.NewResult(1, 1)) + m.mock.ExpectCommit() + + var operations []state.TransactionalStateOperation + + operations = append(operations, state.TransactionalStateOperation{ + Operation: state.Upsert, + Request: createSetRequest(), + }) + + // Act + err := m.roachDba.ExecuteMulti(&state.TransactionalStateRequest{ + Operations: operations, + }) + + // Assert + assert.Nil(t, err) +} + +func TestInvalidMultiSetRequest(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectRollback() + + var operations []state.TransactionalStateOperation + + operations = append(operations, state.TransactionalStateOperation{ + Operation: state.Upsert, + Request: createDeleteRequest(), // Delete request is not valid for Upsert operation + }) + + // Act + err := m.roachDba.ExecuteMulti(&state.TransactionalStateRequest{ + Operations: operations, + }) + + // Assert + assert.NotNil(t, err) +} + +func TestInvalidMultiSetRequestNoKey(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectRollback() + + var operations []state.TransactionalStateOperation + + operations = append(operations, state.TransactionalStateOperation{ + Operation: state.Upsert, + Request: state.SetRequest{Value: "value1"}, // Set request without key is not valid for Upsert operation + }) + + // Act + err := m.roachDba.ExecuteMulti(&state.TransactionalStateRequest{ + Operations: operations, + }) + + // Assert + assert.NotNil(t, err) +} + +func TestValidMultiDeleteRequest(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectExec("DELETE FROM").WillReturnResult(sqlmock.NewResult(1, 1)) + m.mock.ExpectCommit() + + var operations []state.TransactionalStateOperation + + operations = append(operations, state.TransactionalStateOperation{ + Operation: state.Delete, + Request: createDeleteRequest(), + }) + + // Act + err := m.roachDba.ExecuteMulti(&state.TransactionalStateRequest{ + Operations: operations, + }) + + // Assert + assert.Nil(t, err) +} + +func TestInvalidMultiDeleteRequest(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectRollback() + + var operations []state.TransactionalStateOperation + + operations = append(operations, state.TransactionalStateOperation{ + Operation: state.Delete, + Request: createSetRequest(), // Set request is not valid for Delete operation + }) + + // Act + err := m.roachDba.ExecuteMulti(&state.TransactionalStateRequest{ + Operations: operations, + }) + + // Assert + assert.NotNil(t, err) +} + +func TestInvalidMultiDeleteRequestNoKey(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectRollback() + + var operations []state.TransactionalStateOperation + + operations = append(operations, state.TransactionalStateOperation{ + Operation: state.Delete, + Request: state.DeleteRequest{}, // Delete request without key is not valid for Delete operation + }) + + // Act + err := m.roachDba.ExecuteMulti(&state.TransactionalStateRequest{ + Operations: operations, + }) + + // Assert + assert.NotNil(t, err) +} + +func TestMultiOperationOrder(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectExec("INSERT INTO").WillReturnResult(sqlmock.NewResult(1, 1)) + m.mock.ExpectExec("DELETE FROM").WillReturnResult(sqlmock.NewResult(1, 1)) + m.mock.ExpectCommit() + + var operations []state.TransactionalStateOperation + + operations = append(operations, + state.TransactionalStateOperation{ + Operation: state.Upsert, + Request: state.SetRequest{Key: "key1", Value: "value1"}, + }, + state.TransactionalStateOperation{ + Operation: state.Delete, + Request: state.DeleteRequest{Key: "key1"}, + }, + ) + + // Act + err := m.roachDba.ExecuteMulti(&state.TransactionalStateRequest{ + Operations: operations, + }) + + // Assert + assert.Nil(t, err) +} + +func TestInvalidBulkSetNoKey(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectRollback() + + var sets []state.SetRequest + + sets = append(sets, state.SetRequest{ // Set request without key is not valid for Set operation + Value: "value1", + }) + + // Act + err := m.roachDba.BulkSet(sets) + + // Assert + assert.NotNil(t, err) +} + +func TestInvalidBulkSetEmptyValue(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectRollback() + + var sets []state.SetRequest + + sets = append(sets, state.SetRequest{ // Set request without value is not valid for Set operation + Key: "key1", + Value: "", + }) + + // Act + err := m.roachDba.BulkSet(sets) + + // Assert + assert.NotNil(t, err) +} + +func TestValidBulkSet(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectExec("INSERT INTO").WillReturnResult(sqlmock.NewResult(1, 1)) + m.mock.ExpectCommit() + + var sets []state.SetRequest + + sets = append(sets, state.SetRequest{ + Key: "key1", + Value: "value1", + }) + + // Act + err := m.roachDba.BulkSet(sets) + + // Assert + assert.Nil(t, err) +} + +func TestInvalidBulkDeleteNoKey(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectRollback() + + var deletes []state.DeleteRequest + + deletes = append(deletes, state.DeleteRequest{ // Delete request without key is not valid for Delete operation + Key: "", + }) + + // Act + err := m.roachDba.BulkDelete(deletes) + + // Assert + assert.NotNil(t, err) +} + +func TestValidBulkDelete(t *testing.T) { + // Arrange + m, _ := mockDatabase(t) + defer m.db.Close() + + m.mock.ExpectBegin() + m.mock.ExpectExec("DELETE FROM").WillReturnResult(sqlmock.NewResult(1, 1)) + m.mock.ExpectCommit() + + var deletes []state.DeleteRequest + + deletes = append(deletes, state.DeleteRequest{ + Key: "key1", + }) + + // Act + err := m.roachDba.BulkDelete(deletes) + + // Assert + assert.Nil(t, err) +} + +func createSetRequest() state.SetRequest { + return state.SetRequest{ + Key: randomKey(), + Value: randomJSON(), + } +} + +func createDeleteRequest() state.DeleteRequest { + return state.DeleteRequest{ + Key: randomKey(), + } +} + +func mockDatabase(t *testing.T) (*mocks, error) { + logger := logger.NewLogger("test") + + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + + dba := &cockroachDBAccess{ + logger: logger, + db: db, + } + + return &mocks{ + db: db, + mock: mock, + roachDba: dba, + }, err +} diff --git a/state/cockroachdb/cockroachdb_integration_test.go b/state/cockroachdb/cockroachdb_integration_test.go new file mode 100644 index 0000000000..b0cd2fa69b --- /dev/null +++ b/state/cockroachdb/cockroachdb_integration_test.go @@ -0,0 +1,726 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cockroachdb + +import ( + "database/sql" + "encoding/json" + "fmt" + "os" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + + "github.com/dapr/components-contrib/state" + "github.com/dapr/kit/logger" +) + +const ( + connectionStringEnvKey = "DAPR_TEST_COCKROACHDB_CONNSTRING" // Environment variable containing the connection string. +) + +type fakeItem struct { + Color string +} + +func TestCockroachDBIntegration(t *testing.T) { + t.Parallel() + + connectionString := getConnectionString() + if connectionString == "" { + t.Skipf("CockroachDB state integration tests skipped. To enable define the connection string using environment variable '%s' (example 'export %s=\"host=localhost user=postgres password=example port=5432 connect_timeout=10 database=dapr_test\")", connectionStringEnvKey, connectionStringEnvKey) + } + + t.Run("Test init configurations", func(t *testing.T) { + t.Parallel() + testInitConfiguration(t) + }) + + metadata := state.Metadata{ + Properties: map[string]string{connectionStringKey: connectionString}, + } + + pgs := New(logger.NewLogger("test")) + t.Cleanup(func() { + defer pgs.Close() + }) + + if err := pgs.Init(metadata); err != nil { + t.Fatal(err) + } + + t.Run("Create table succeeds", func(t *testing.T) { + t.Parallel() + + dbAccess, ok := pgs.dbaccess.(*cockroachDBAccess) + assert.True(t, ok) + + testCreateTable(t, dbAccess) + }) + + t.Run("Get Set Delete one item", func(t *testing.T) { + t.Parallel() + setGetUpdateDeleteOneItem(t, pgs) + }) + + t.Run("Get item that does not exist", func(t *testing.T) { + t.Parallel() + getItemThatDoesNotExist(t, pgs) + }) + + t.Run("Get item with no key fails", func(t *testing.T) { + t.Parallel() + getItemWithNoKey(t, pgs) + }) + + t.Run("Set updates the updatedate field", func(t *testing.T) { + t.Parallel() + setUpdatesTheUpdatedateField(t, pgs) + }) + + t.Run("Set item with no key fails", func(t *testing.T) { + t.Parallel() + setItemWithNoKey(t, pgs) + }) + + t.Run("Bulk set and bulk delete", func(t *testing.T) { + t.Parallel() + testBulkSetAndBulkDelete(t, pgs) + }) + + t.Run("Update and delete with etag succeeds", func(t *testing.T) { + t.Parallel() + updateAndDeleteWithEtagSucceeds(t, pgs) + }) + + t.Run("Update with old etag fails", func(t *testing.T) { + t.Parallel() + updateWithOldEtagFails(t, pgs) + }) + + t.Run("Insert with etag fails", func(t *testing.T) { + t.Parallel() + newItemWithEtagFails(t, pgs) + }) + + t.Run("Delete with invalid etag fails", func(t *testing.T) { + t.Parallel() + deleteWithInvalidEtagFails(t, pgs) + }) + + t.Run("Delete item with no key fails", func(t *testing.T) { + t.Parallel() + deleteWithNoKeyFails(t, pgs) + }) + + t.Run("Delete an item that does not exist", func(t *testing.T) { + t.Parallel() + deleteItemThatDoesNotExist(t, pgs) + }) + + t.Run("Multi with delete and set", func(t *testing.T) { + t.Parallel() + multiWithDeleteAndSet(t, pgs) + }) + + t.Run("Multi with delete only", func(t *testing.T) { + t.Parallel() + multiWithDeleteOnly(t, pgs) + }) + + t.Run("Multi with set only", func(t *testing.T) { + t.Parallel() + multiWithSetOnly(t, pgs) + }) +} + +// setGetUpdateDeleteOneItem validates setting one item, getting it, and deleting it. +func setGetUpdateDeleteOneItem(t *testing.T, pgs *CockroachDB) { + t.Helper() + + key := randomKey() + value := &fakeItem{Color: "yellow"} + + setItem(t, pgs, key, value, nil) + + getResponse, outputObject := getItem(t, pgs, key) + assert.Equal(t, value, outputObject) + + newValue := &fakeItem{Color: "green"} + setItem(t, pgs, key, newValue, getResponse.ETag) + getResponse, outputObject = getItem(t, pgs, key) + assert.Equal(t, newValue, outputObject) + + deleteItem(t, pgs, key, getResponse.ETag) +} + +// testCreateTable tests the ability to create the state table. +func testCreateTable(t *testing.T, dba *cockroachDBAccess) { + t.Helper() + + tableName := "test_state" + + // Drop the table if it already exists. + exists, err := tableExists(dba.db, tableName) + assert.Nil(t, err) + if exists { + dropTable(t, dba.db, tableName) + } + + // Create the state table and test for its existence. + err = dba.ensureStateTable(tableName) + assert.Nil(t, err) + exists, err = tableExists(dba.db, tableName) + assert.Nil(t, err) + assert.True(t, exists) + + // Drop the state table. + dropTable(t, dba.db, tableName) +} + +func dropTable(t *testing.T, db *sql.DB, tableName string) { + t.Helper() + + _, err := db.Exec(fmt.Sprintf("DROP TABLE %s", tableName)) + assert.Nil(t, err) +} + +func deleteItemThatDoesNotExist(t *testing.T, pgs *CockroachDB) { + t.Helper() + + // Delete the item with a key not in the store. + deleteReq := &state.DeleteRequest{ + Key: randomKey(), + ETag: nil, + Metadata: nil, + Options: state.DeleteStateOption{ + Concurrency: "", + Consistency: "", + }, + } + err := pgs.Delete(deleteReq) + assert.Nil(t, err) +} + +func multiWithSetOnly(t *testing.T, pgs *CockroachDB) { + t.Helper() + + var operations []state.TransactionalStateOperation + var setRequests []state.SetRequest + for i := 0; i < 3; i++ { + req := state.SetRequest{ + Key: randomKey(), + Value: randomJSON(), + ETag: nil, + Metadata: nil, + Options: state.SetStateOption{ + Concurrency: "", + Consistency: "", + }, + ContentType: nil, + } + setRequests = append(setRequests, req) + operations = append(operations, state.TransactionalStateOperation{ + Operation: state.Upsert, + Request: req, + }) + } + + err := pgs.Multi(&state.TransactionalStateRequest{ + Operations: operations, + Metadata: nil, + }) + assert.Nil(t, err) + + for _, set := range setRequests { + assert.True(t, storeItemExists(t, set.Key)) + deleteItem(t, pgs, set.Key, nil) + } +} + +func multiWithDeleteOnly(t *testing.T, pgs *CockroachDB) { + t.Helper() + + var operations []state.TransactionalStateOperation + var deleteRequests []state.DeleteRequest + for i := 0; i < 3; i++ { + req := state.DeleteRequest{ + Key: randomKey(), + ETag: nil, + Metadata: nil, + Options: state.DeleteStateOption{ + Concurrency: "", + Consistency: "", + }, + } + + // Add the item to the database. + setItem(t, pgs, req.Key, randomJSON(), nil) // Add the item to the database. + + // Add the item to a slice of delete requests. + deleteRequests = append(deleteRequests, req) + + // Add the item to the multi transaction request. + operations = append(operations, state.TransactionalStateOperation{ + Operation: state.Delete, + Request: req, + }) + } + + err := pgs.Multi(&state.TransactionalStateRequest{ + Operations: operations, + Metadata: nil, + }) + assert.Nil(t, err) + + for _, delete := range deleteRequests { + assert.False(t, storeItemExists(t, delete.Key)) + } +} + +func multiWithDeleteAndSet(t *testing.T, pgs *CockroachDB) { + t.Helper() + + var operations []state.TransactionalStateOperation + var deleteRequests []state.DeleteRequest + for i := 0; i < 3; i++ { + req := state.DeleteRequest{ + Key: randomKey(), + ETag: nil, + Metadata: nil, + Options: state.DeleteStateOption{ + Concurrency: "", + Consistency: "", + }, + } + + // Add the item to the database. + setItem(t, pgs, req.Key, randomJSON(), nil) // Add the item to the database. + + // Add the item to a slice of delete requests. + deleteRequests = append(deleteRequests, req) + + // Add the item to the multi transaction request. + operations = append(operations, state.TransactionalStateOperation{ + Operation: state.Delete, + Request: req, + }) + } + + // Create the set requests. + var setRequests []state.SetRequest + for i := 0; i < 3; i++ { + req := state.SetRequest{ + Key: randomKey(), + Value: randomJSON(), + ETag: nil, + Metadata: nil, + Options: state.SetStateOption{ + Concurrency: "", + Consistency: "", + }, + ContentType: nil, + } + setRequests = append(setRequests, req) + operations = append(operations, state.TransactionalStateOperation{ + Operation: state.Upsert, + Request: req, + }) + } + + err := pgs.Multi(&state.TransactionalStateRequest{ + Operations: operations, + Metadata: nil, + }) + assert.Nil(t, err) + + for _, delete := range deleteRequests { + assert.False(t, storeItemExists(t, delete.Key)) + } + + for _, set := range setRequests { + assert.True(t, storeItemExists(t, set.Key)) + deleteItem(t, pgs, set.Key, nil) + } +} + +func deleteWithInvalidEtagFails(t *testing.T, pgs *CockroachDB) { + t.Helper() + + // Create new item. + key := randomKey() + value := &fakeItem{Color: "mauve"} + setItem(t, pgs, key, value, nil) + + etag := "1234" + // Delete the item with a fake etag. + deleteReq := &state.DeleteRequest{ + Key: key, + ETag: &etag, + Metadata: nil, + Options: state.DeleteStateOption{ + Concurrency: "", + Consistency: "", + }, + } + err := pgs.Delete(deleteReq) + assert.NotNil(t, err) +} + +func deleteWithNoKeyFails(t *testing.T, pgs *CockroachDB) { + t.Helper() + + deleteReq := &state.DeleteRequest{ + Key: "", + ETag: nil, + Metadata: nil, + Options: state.DeleteStateOption{ + Concurrency: "", + Consistency: "", + }, + } + err := pgs.Delete(deleteReq) + assert.NotNil(t, err) +} + +// newItemWithEtagFails creates a new item and also supplies an ETag, which is invalid - expect failure. +func newItemWithEtagFails(t *testing.T, pgs *CockroachDB) { + t.Helper() + + value := &fakeItem{Color: "teal"} + invalidEtag := "12345" + + setReq := &state.SetRequest{ + Key: randomKey(), + ETag: &invalidEtag, + Value: value, + Metadata: nil, + Options: state.SetStateOption{ + Concurrency: "", + Consistency: "", + }, + ContentType: nil, + } + + err := pgs.Set(setReq) + assert.NotNil(t, err) +} + +func updateWithOldEtagFails(t *testing.T, pgs *CockroachDB) { + t.Helper() + + // Create and retrieve new item. + key := randomKey() + value := &fakeItem{Color: "gray"} + setItem(t, pgs, key, value, nil) + getResponse, _ := getItem(t, pgs, key) + assert.NotNil(t, getResponse.ETag) + originalEtag := getResponse.ETag + + // Change the value and get the updated etag. + newValue := &fakeItem{Color: "silver"} + setItem(t, pgs, key, newValue, originalEtag) + _, updatedItem := getItem(t, pgs, key) + assert.Equal(t, newValue, updatedItem) + + // Update again with the original etag - expect udpate failure. + newValue = &fakeItem{Color: "maroon"} + setReq := &state.SetRequest{ + Key: key, + ETag: originalEtag, + Value: newValue, + Metadata: nil, + Options: state.SetStateOption{ + Concurrency: "", + Consistency: "", + }, + ContentType: nil, + } + err := pgs.Set(setReq) + assert.NotNil(t, err) +} + +func updateAndDeleteWithEtagSucceeds(t *testing.T, pgs *CockroachDB) { + t.Helper() + + // Create and retrieve new item. + key := randomKey() + value := &fakeItem{Color: "hazel"} + setItem(t, pgs, key, value, nil) + getResponse, _ := getItem(t, pgs, key) + assert.NotNil(t, getResponse.ETag) + + // Change the value and compare. + value.Color = "purple" + setItem(t, pgs, key, value, getResponse.ETag) + updateResponse, updatedItem := getItem(t, pgs, key) + assert.Equal(t, value, updatedItem) + + // ETag should change when item is updated. + assert.NotEqual(t, getResponse.ETag, updateResponse.ETag) + + // Delete. + deleteItem(t, pgs, key, updateResponse.ETag) + + // Item is not in the data store. + assert.False(t, storeItemExists(t, key)) +} + +// getItemThatDoesNotExist validates the behavior of retrieving an item that does not exist. +func getItemThatDoesNotExist(t *testing.T, pgs *CockroachDB) { + t.Helper() + + key := randomKey() + response, outputObject := getItem(t, pgs, key) + assert.Nil(t, response.Data) + assert.Equal(t, "", outputObject.Color) +} + +// getItemWithNoKey validates that attempting a Get operation without providing a key will return an error. +func getItemWithNoKey(t *testing.T, pgs *CockroachDB) { + t.Helper() + + getReq := &state.GetRequest{ + Key: "", + Metadata: nil, + Options: state.GetStateOption{ + Consistency: "", + }, + } + + response, getErr := pgs.Get(getReq) + assert.NotNil(t, getErr) + assert.Nil(t, response) +} + +// setUpdatesTheUpdatedateField proves that the updateddate is set for an update, and not set upon insert. +func setUpdatesTheUpdatedateField(t *testing.T, pgs *CockroachDB) { + t.Helper() + + key := randomKey() + value := &fakeItem{Color: "orange"} + setItem(t, pgs, key, value, nil) + + // insertdate should have a value and updatedate should be nil. + _, insertdate, updatedate := getRowData(t, key) + assert.NotNil(t, insertdate) + assert.Equal(t, "", updatedate.String) + + // insertdate should not change, updatedate should have a value. + value = &fakeItem{Color: "aqua"} + setItem(t, pgs, key, value, nil) + _, newinsertdate, updatedate := getRowData(t, key) + assert.Equal(t, insertdate, newinsertdate) // The insertdate should not change. + assert.NotEqual(t, "", updatedate.String) + + deleteItem(t, pgs, key, nil) +} + +func setItemWithNoKey(t *testing.T, pgs *CockroachDB) { + t.Helper() + + setReq := &state.SetRequest{ + Key: "", + Value: nil, + ETag: nil, + Metadata: nil, + Options: state.SetStateOption{ + Concurrency: "", + Consistency: "", + }, + ContentType: nil, + } + + err := pgs.Set(setReq) + assert.NotNil(t, err) +} + +// Tests valid bulk sets and deletes. +func testBulkSetAndBulkDelete(t *testing.T, pgs *CockroachDB) { + t.Helper() + + setReq := []state.SetRequest{ + { + Key: randomKey(), + Value: &fakeItem{Color: "blue"}, + }, + { + Key: randomKey(), + Value: &fakeItem{Color: "red"}, + }, + } + + err := pgs.BulkSet(setReq) + assert.Nil(t, err) + assert.True(t, storeItemExists(t, setReq[0].Key)) + assert.True(t, storeItemExists(t, setReq[1].Key)) + + deleteReq := []state.DeleteRequest{ + { + Key: setReq[0].Key, + }, + { + Key: setReq[1].Key, + }, + } + + err = pgs.BulkDelete(deleteReq) + assert.Nil(t, err) + assert.False(t, storeItemExists(t, setReq[0].Key)) + assert.False(t, storeItemExists(t, setReq[1].Key)) +} + +// testInitConfiguration tests valid and invalid config settings. +func testInitConfiguration(t *testing.T) { + t.Helper() + + logger := logger.NewLogger("test") + tests := []struct { + name string + props map[string]string + expectedErr string + }{ + { + name: "Empty", + props: map[string]string{}, + expectedErr: errMissingConnectionString, + }, + { + name: "Valid connection string", + props: map[string]string{connectionStringKey: getConnectionString()}, + expectedErr: "", + }, + } + + for _, rowTest := range tests { + t.Run(rowTest.name, func(t *testing.T) { + cockroackDB := New(logger) + defer cockroackDB.Close() + + metadata := state.Metadata{ + Properties: rowTest.props, + } + + err := cockroackDB.Init(metadata) + if rowTest.expectedErr == "" { + assert.Nil(t, err) + } else { + assert.NotNil(t, err) + assert.Equal(t, err.Error(), rowTest.expectedErr) + } + }) + } +} + +func getConnectionString() string { + return os.Getenv(connectionStringEnvKey) +} + +func setItem(t *testing.T, pgs *CockroachDB, key string, value interface{}, etag *string) { + t.Helper() + + setReq := &state.SetRequest{ + Key: key, + ETag: etag, + Value: value, + Metadata: map[string]string{}, + Options: state.SetStateOption{ + Concurrency: "", + Consistency: "", + }, + ContentType: nil, + } + + err := pgs.Set(setReq) + assert.Nil(t, err) + itemExists := storeItemExists(t, key) + assert.True(t, itemExists) +} + +func getItem(t *testing.T, pgs *CockroachDB, key string) (*state.GetResponse, *fakeItem) { + t.Helper() + + getReq := &state.GetRequest{ + Key: key, + Options: state.GetStateOption{ + Consistency: "", + }, + Metadata: map[string]string{}, + } + + response, getErr := pgs.Get(getReq) + assert.Nil(t, getErr) + assert.NotNil(t, response) + outputObject := &fakeItem{ + Color: "", + } + _ = json.Unmarshal(response.Data, outputObject) + + return response, outputObject +} + +func deleteItem(t *testing.T, pgs *CockroachDB, key string, etag *string) { + t.Helper() + + deleteReq := &state.DeleteRequest{ + Key: key, + ETag: etag, + Options: state.DeleteStateOption{ + Concurrency: "", + Consistency: "", + }, + Metadata: map[string]string{}, + } + + deleteErr := pgs.Delete(deleteReq) + assert.Nil(t, deleteErr) + assert.False(t, storeItemExists(t, key)) +} + +func storeItemExists(t *testing.T, key string) bool { + t.Helper() + + databaseConnection, err := sql.Open("pgx", getConnectionString()) + assert.Nil(t, err) + defer databaseConnection.Close() + + exists := false + statement := fmt.Sprintf(`SELECT EXISTS (SELECT * FROM %s WHERE key = $1)`, tableName) + err = databaseConnection.QueryRow(statement, key).Scan(&exists) + assert.Nil(t, err) + + return exists +} + +func getRowData(t *testing.T, key string) (returnValue string, insertdate sql.NullString, updatedate sql.NullString) { + t.Helper() + + databaseConnection, err := sql.Open("pgx", getConnectionString()) + assert.Nil(t, err) + defer databaseConnection.Close() + + err = databaseConnection.QueryRow(fmt.Sprintf("SELECT value, insertdate, updatedate FROM %s WHERE key = $1", tableName), key).Scan(&returnValue, &insertdate, &updatedate) + assert.Nil(t, err) + + return returnValue, insertdate, updatedate +} + +func randomKey() string { + return uuid.New().String() +} + +func randomJSON() *fakeItem { + return &fakeItem{Color: randomKey()} +} diff --git a/state/cockroachdb/cockroachdb_query.go b/state/cockroachdb/cockroachdb_query.go new file mode 100644 index 0000000000..5fffb006ae --- /dev/null +++ b/state/cockroachdb/cockroachdb_query.go @@ -0,0 +1,210 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cockroachdb + +import ( + "database/sql" + "fmt" + "strconv" + "strings" + + "github.com/agrea/ptr" + + "github.com/dapr/components-contrib/state" + "github.com/dapr/components-contrib/state/query" + "github.com/dapr/kit/logger" +) + +type Query struct { + query string + params []interface{} + limit int + skip *int64 +} + +func (q *Query) VisitEQ(filter *query.EQ) (string, error) { + return q.whereFieldEqual(filter.Key, filter.Val), nil +} + +func (q *Query) VisitIN(filter *query.IN) (string, error) { + if len(filter.Vals) == 0 { + return "", fmt.Errorf("empty IN operator for key %q", filter.Key) + } + + str := "(" + str += q.whereFieldEqual(filter.Key, filter.Vals[0]) + + for _, v := range filter.Vals[1:] { + str += " OR " + str += q.whereFieldEqual(filter.Key, v) + } + str += ")" + return str, nil +} + +func (q *Query) visitFilters(operation string, filters []query.Filter) (string, error) { + var ( + str string + err error + ) + + arr := make([]string, len(filters)) + + for filterIndex, filter := range filters { + switch filterType := filter.(type) { + case *query.EQ: + str, err = q.VisitEQ(filterType) + case *query.IN: + str, err = q.VisitIN(filterType) + case *query.OR: + str, err = q.VisitOR(filterType) + case *query.AND: + str, err = q.VisitAND(filterType) + default: + return "", fmt.Errorf("unsupported filter type %#v", filterType) + } + + if err != nil { + return "", err + } + + arr[filterIndex] = str + } + + sep := fmt.Sprintf(" %s ", operation) + + return fmt.Sprintf("(%s)", strings.Join(arr, sep)), nil +} + +func (q *Query) VisitAND(filter *query.AND) (string, error) { + return q.visitFilters("AND", filter.Filters) +} + +func (q *Query) VisitOR(filter *query.OR) (string, error) { + return q.visitFilters("OR", filter.Filters) +} + +func (q *Query) Finalize(filters string, storeQuery *query.Query) error { + q.query = fmt.Sprintf("SELECT key, value, etag FROM %s", tableName) + + if filters != "" { + q.query += fmt.Sprintf(" WHERE %s", filters) + } + + if len(storeQuery.Sort) > 0 { + q.query += " ORDER BY " + + for sortIndex, sortItem := range storeQuery.Sort { + if sortIndex > 0 { + q.query += ", " + } + q.query += translateFieldToFilter(sortItem.Key) + if sortItem.Order != "" { + q.query += fmt.Sprintf(" %s", sortItem.Order) + } + } + } + + if storeQuery.Page.Limit > 0 { + q.query += fmt.Sprintf(" LIMIT %d", storeQuery.Page.Limit) + q.limit = storeQuery.Page.Limit + } + + if len(storeQuery.Page.Token) != 0 { + skip, err := strconv.ParseInt(storeQuery.Page.Token, 10, 64) + if err != nil { + return err + } + q.query += fmt.Sprintf(" OFFSET %d", skip) + q.skip = &skip + } + + return nil +} + +func (q *Query) execute(logger logger.Logger, db *sql.DB) ([]state.QueryItem, string, error) { + rows, err := db.Query(q.query, q.params...) + if err != nil { + return nil, "", fmt.Errorf("query executes '%s' failed: %w", q.query, err) + } + defer rows.Close() + + ret := []state.QueryItem{} + for rows.Next() { + var ( + key string + data []byte + etag int + ) + if err = rows.Scan(&key, &data, &etag); err != nil { + return nil, "", fmt.Errorf("read fields from query '%s' failed: %w", q.query, err) + } + result := state.QueryItem{ + Key: key, + Data: data, + ETag: ptr.String(strconv.Itoa(etag)), + Error: "", + ContentType: nil, + } + ret = append(ret, result) + } + + if err = rows.Err(); err != nil { + return nil, "", fmt.Errorf("interation rows from query '%s' failed: %w", q.query, err) + } + + var token string + if q.limit != 0 { + var skip int64 + if q.skip != nil { + skip = *q.skip + } + token = strconv.FormatInt(skip+int64(len(ret)), 10) + } + + return ret, token, nil +} + +func (q *Query) addParamValueAndReturnPosition(value interface{}) int { + q.params = append(q.params, fmt.Sprintf("%v", value)) + return len(q.params) +} + +func translateFieldToFilter(key string) string { + // add preceding "value". + key = "value." + key + + fieldParts := strings.Split(key, ".") + filterField := fieldParts[0] + fieldParts = fieldParts[1:] + + for fieldIndex, fieldPart := range fieldParts { + filterField += "->" + + if fieldIndex+1 == len(fieldParts) { + filterField += ">" + } + + filterField += fmt.Sprintf("'%s'", fieldPart) + } + + return filterField +} + +func (q *Query) whereFieldEqual(key string, value interface{}) string { + position := q.addParamValueAndReturnPosition(value) + filterField := translateFieldToFilter(key) + query := fmt.Sprintf("%s=$%v", filterField, position) + return query +} diff --git a/state/cockroachdb/cockroachdb_query_test.go b/state/cockroachdb/cockroachdb_query_test.go new file mode 100644 index 0000000000..225f6f3ff6 --- /dev/null +++ b/state/cockroachdb/cockroachdb_query_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cockroachdb + +import ( + "encoding/json" + "io/ioutil" + "testing" + + "github.com/agrea/ptr" + "github.com/stretchr/testify/assert" + + "github.com/dapr/components-contrib/state/query" +) + +func TestPostgresqlQueryBuildQuery(t *testing.T) { + t.Parallel() + + tests := []struct { + input string + query string + }{ + { + input: "../../tests/state/query/q1.json", + query: "SELECT key, value, etag FROM state LIMIT 2", + }, + { + input: "../../tests/state/query/q2.json", + query: "SELECT key, value, etag FROM state WHERE value->>'state'=$1 LIMIT 2", + }, + { + input: "../../tests/state/query/q2-token.json", + query: "SELECT key, value, etag FROM state WHERE value->>'state'=$1 LIMIT 2 OFFSET 2", + }, + { + input: "../../tests/state/query/q3.json", + query: "SELECT key, value, etag FROM state WHERE (value->'person'->>'org'=$1 AND (value->>'state'=$2 OR value->>'state'=$3)) ORDER BY value->>'state' DESC, value->'person'->>'name'", + }, + { + input: "../../tests/state/query/q4.json", + query: "SELECT key, value, etag FROM state WHERE (value->'person'->>'org'=$1 OR (value->'person'->>'org'=$2 AND (value->>'state'=$3 OR value->>'state'=$4))) ORDER BY value->>'state' DESC, value->'person'->>'name' LIMIT 2", + }, + { + input: "../../tests/state/query/q5.json", + query: "SELECT key, value, etag FROM state WHERE (value->'person'->>'org'=$1 AND (value->'person'->>'name'=$2 OR (value->>'state'=$3 OR value->>'state'=$4))) ORDER BY value->>'state' DESC, value->'person'->>'name' LIMIT 2", + }, + } + for _, test := range tests { + data, err := ioutil.ReadFile(test.input) + assert.NoError(t, err) + var storeQuery query.Query + err = json.Unmarshal(data, &storeQuery) + assert.NoError(t, err) + + stateQuery := &Query{ + query: "", + params: nil, + limit: 0, + skip: ptr.Int64(0), + } + qbuilder := query.NewQueryBuilder(stateQuery) + err = qbuilder.BuildQuery(&storeQuery) + assert.NoError(t, err) + assert.Equal(t, test.query, stateQuery.query) + } +} diff --git a/state/cockroachdb/cockroachdb_test.go b/state/cockroachdb/cockroachdb_test.go new file mode 100644 index 0000000000..173cae166f --- /dev/null +++ b/state/cockroachdb/cockroachdb_test.go @@ -0,0 +1,128 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cockroachdb + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/dapr/components-contrib/state" + "github.com/dapr/kit/logger" +) + +const ( + fakeConnectionString = "not a real connection" +) + +// Fake implementation of interface postgressql.dbaccess. +type fakeDBaccess struct { + logger logger.Logger + initExecuted bool + setExecuted bool + getExecuted bool + deleteExecuted bool +} + +func (m *fakeDBaccess) Init(metadata state.Metadata) error { + m.initExecuted = true + + return nil +} + +func (m *fakeDBaccess) Set(req *state.SetRequest) error { + m.setExecuted = true + + return nil +} + +func (m *fakeDBaccess) Get(req *state.GetRequest) (*state.GetResponse, error) { + m.getExecuted = true + + return nil, nil +} + +func (m *fakeDBaccess) Delete(req *state.DeleteRequest) error { + m.deleteExecuted = true + + return nil +} + +func (m *fakeDBaccess) BulkSet(req []state.SetRequest) error { + return nil +} + +func (m *fakeDBaccess) BulkDelete(req []state.DeleteRequest) error { + return nil +} + +func (m *fakeDBaccess) ExecuteMulti(req *state.TransactionalStateRequest) error { + return nil +} + +func (m *fakeDBaccess) Query(req *state.QueryRequest) (*state.QueryResponse, error) { + return nil, nil +} + +func (m *fakeDBaccess) Close() error { + return nil +} + +func (m *fakeDBaccess) Ping() error { + return nil +} + +// Proves that the Init method runs the init method. +func TestInitRunsDBAccessInit(t *testing.T) { + t.Parallel() + _, fake := createCockroachDBWithFake(t) + assert.True(t, fake.initExecuted) +} + +func createCockroachDBWithFake(t *testing.T) (*CockroachDB, *fakeDBaccess) { + t.Helper() + + pgs := createCockroachDB(t) + fake, ok := pgs.dbaccess.(*fakeDBaccess) + assert.True(t, ok) + + return pgs, fake +} + +func createCockroachDB(t *testing.T) *CockroachDB { + t.Helper() + + logger := logger.NewLogger("test") + + dba := &fakeDBaccess{ + logger: logger, + initExecuted: false, + setExecuted: false, + getExecuted: false, + } + + pgs := internalNew(logger, dba) + assert.NotNil(t, pgs) + + metadata := &state.Metadata{ + Properties: map[string]string{connectionStringKey: fakeConnectionString}, + } + + err := pgs.Init(*metadata) + + assert.Nil(t, err) + assert.NotNil(t, pgs.dbaccess) + + return pgs +} diff --git a/state/cockroachdb/dbaccess.go b/state/cockroachdb/dbaccess.go new file mode 100644 index 0000000000..d89af8a545 --- /dev/null +++ b/state/cockroachdb/dbaccess.go @@ -0,0 +1,30 @@ +/* +Copyright 2022 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cockroachdb + +import "github.com/dapr/components-contrib/state" + +// dbAccess is a private interface which enables unit testing of CockroachDB. +type dbAccess interface { + Init(metadata state.Metadata) error + Set(req *state.SetRequest) error + BulkSet(req []state.SetRequest) error + Get(req *state.GetRequest) (*state.GetResponse, error) + Delete(req *state.DeleteRequest) error + BulkDelete(req []state.DeleteRequest) error + ExecuteMulti(req *state.TransactionalStateRequest) error + Query(req *state.QueryRequest) (*state.QueryResponse, error) + Ping() error + Close() error +} diff --git a/tests/config/state/cockroachdb/statestore.yml b/tests/config/state/cockroachdb/statestore.yml new file mode 100644 index 0000000000..21adb8dac4 --- /dev/null +++ b/tests/config/state/cockroachdb/statestore.yml @@ -0,0 +1,11 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.cockroachdb + metadata: + - name: connectionString + value: "host=localhost user=root port=26257 connect_timeout=10 database=dapr_test" + - name: actorStateStore + value: "true" diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index 529b68b18f..2e98f52dc3 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -24,4 +24,7 @@ components: operations: ["set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write"] - component: cassandra allOperations: false - operations: [ "set", "get", "delete", "bulkset", "bulkdelete" ] \ No newline at end of file + operations: [ "set", "get", "delete", "bulkset", "bulkdelete" ] + - component: cockroachdb + allOperations: false + operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "query" ] \ No newline at end of file diff --git a/tests/conformance/common.go b/tests/conformance/common.go index 13c18f960f..0d6f1f3747 100644 --- a/tests/conformance/common.go +++ b/tests/conformance/common.go @@ -65,6 +65,7 @@ import ( s_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb" s_azuretablestorage "github.com/dapr/components-contrib/state/azure/tablestorage" s_cassandra "github.com/dapr/components-contrib/state/cassandra" + s_cockroachdb "github.com/dapr/components-contrib/state/cockroachdb" s_mongodb "github.com/dapr/components-contrib/state/mongodb" s_mysql "github.com/dapr/components-contrib/state/mysql" s_postgresql "github.com/dapr/components-contrib/state/postgresql" @@ -422,6 +423,8 @@ func loadStateStore(tc TestComponent) state.Store { store = s_azuretablestorage.NewAzureTablesStateStore(testLogger) case "cassandra": store = s_cassandra.NewCassandraStateStore(testLogger) + case "cockroachdb": + store = s_cockroachdb.New(testLogger) default: return nil }