Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AdaptiveRangeLimit constant for adaptive range limits #294

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions connectors/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type partitionRange struct {
end int
}

const defaultRangeLimit = 200

// remove deletes the values referenced by the partitionRange. Since this function modifies
// the data stored in the in-memory connector, a write lock must be held when calling
// this function.
Expand Down Expand Up @@ -492,6 +494,11 @@ func (c *Connector) Range(_ context.Context, ei *dosa.EntityInfo, columnConditio
partitionRange.start += offset
}
}

if limit == dosa.AdaptiveRangeLimit {
limit = defaultRangeLimit
}

slice := partitionRange.values()
token = ""
if len(slice) > limit {
Expand Down
7 changes: 7 additions & 0 deletions connectors/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,13 @@ func TestConnector_Range(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, data)
assert.Empty(t, token)

// Test with adaptive limits
data, _, _ = sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{
"f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}},
"c1": {{Op: dosa.Eq, Value: dosa.FieldValue(int64(1))}},
}, dosa.All(), "", 200)
assert.Len(t, data, idcount)
}

func TestConnector_TUUIDs(t *testing.T) {
Expand Down
8 changes: 6 additions & 2 deletions connectors/random/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
)

const (
maxBlobSize = 32
maxStringSize = 64
maxBlobSize = 32
maxStringSize = 64
defaultRangeLimit = 200
)

// Connector is a connector implementation for testing
Expand Down Expand Up @@ -149,6 +150,9 @@ func (c *Connector) MultiRemove(ctx context.Context, ei *dosa.EntityInfo, multiV

// Range returns a random set of data, and a random continuation token
func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
if limit == dosa.AdaptiveRangeLimit {
limit = defaultRangeLimit
}
vals := make([]map[string]dosa.FieldValue, limit)
for inx := range vals {
vals[inx] = Data(ei, minimumFields)
Expand Down
6 changes: 6 additions & 0 deletions connectors/random/random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ func TestRandom_Range(t *testing.T) {
assert.NoError(t, err)
}

func TestRandom_RangeAdaptiveLimits(t *testing.T) {
vals, _, err := sut.Range(ctx, testInfo, testConditions, minimumFields, "", dosa.AdaptiveRangeLimit)
assert.Len(t, vals, 200)
assert.NoError(t, err)
}

func TestRandom_Scan(t *testing.T) {
vals, _, err := sut.Scan(ctx, testInfo, minimumFields, "", 32)
assert.NotNil(t, vals)
Expand Down
2 changes: 1 addition & 1 deletion pager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type pager struct {
}

func addLimitTokenString(w io.Writer, limit int, token string) {
if limit > 0 {
if limit == AdaptiveRangeLimit || limit > 0 {
fmt.Fprintf(w, " limit %d", limit)
}
if token != "" {
Expand Down
8 changes: 8 additions & 0 deletions range.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ import (
"github.com/pkg/errors"
)

const (
// AdaptiveRangeLimit is a sentinel value that is used to indicate an intent
// to range over data in a partition as fast as possible. The server will
// determine an appropriate limit to use to range over the partition as fast
// as possible while ensuring the server remains healthy.
AdaptiveRangeLimit = -1
)

// RangeOp is used to specify constraints to Range calls
type RangeOp struct {
pager
Expand Down
6 changes: 6 additions & 0 deletions range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ var rangeTestCases = []struct {
stringer: "<empty> limit 10",
converted: "<empty> limit 10",
},
{
descript: "empty with adaptive limit",
rop: NewRangeOp(&AllTypes{}).Limit(AdaptiveRangeLimit),
stringer: "<empty> limit -1",
converted: "<empty> limit -1",
},
{
descript: "empty with token",
rop: NewRangeOp(&AllTypes{}).Offset("toketoketoke"),
Expand Down