Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(query): Read just the latest value for scalar types #8966

Merged
merged 4 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,6 +1393,58 @@ func (l *List) GetLangTags(readTs uint64) ([]string, error) {
hex.EncodeToString(l.key))
}

func (l *List) StaticValue(readTs uint64) (*pb.PostingList, error) {
l.RLock()
defer l.RUnlock()

return l.findStaticValue(readTs), nil
}

func (l *List) findStaticValue(readTs uint64) *pb.PostingList {
l.AssertRLock()

harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
if l.mutationMap == nil {
// If mutation map is empty, check if there is some data, and return it.
if l.plist != nil && len(l.plist.Postings) > 0 {
return l.plist
}
return nil
}

// Return readTs is if it's present in the mutation. It's going to be the latest value.
mutation, ok := l.mutationMap[readTs]
if ok {
return mutation
}

// If maxTs < readTs then we need to read maxTs
if l.maxTs < readTs {
mutation, ok = l.mutationMap[l.maxTs]
if ok {
return mutation
}
}

// This means that maxTs > readTs. Go through the map to find the closest value to readTs
mutation = nil
ts_found := uint64(0)
for _, mutation_i := range l.mutationMap {
ts := mutation_i.CommitTs
if ts <= readTs && ts > ts_found {
ts_found = ts
mutation = mutation_i
}
}

if mutation != nil {
return mutation
}

// If we reach here, that means that there was no entry in mutation map which is less than readTs. That
// means we need to return l.plist
return l.plist
}

// Value returns the default value from the posting list. The default value is
// defined as the value without a language tag.
// Value cannot be used to read from cache
Expand Down
112 changes: 112 additions & 0 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,70 @@ func (l *List) commitMutation(startTs, commitTs uint64) error {
return nil
}

func TestGetSinglePosting(t *testing.T) {
key := x.DataKey(x.GalaxyAttr("GetSinglePosting"), 123)
txn := NewTxn(5)
l, err := txn.Get(key)
require.NoError(t, err)

create_pl := func(startTs uint64) *pb.PostingList {
return &pb.PostingList{
Postings: []*pb.Posting{
{
Uid: 1,
Op: 1,
StartTs: startTs,
CommitTs: startTs,
},
},
CommitTs: startTs,
}
}

res, err := l.StaticValue(1)
require.NoError(t, err)
require.Equal(t, res == nil, true)

l.plist = create_pl(1)

res, err = l.StaticValue(1)
require.NoError(t, err)
require.Equal(t, res.Postings[0].StartTs, uint64(1))

res, err = l.StaticValue(4)
require.NoError(t, err)
require.Equal(t, res.Postings[0].StartTs, uint64(1))

l.mutationMap = make(map[uint64]*pb.PostingList)
l.mutationMap[3] = create_pl(3)
l.maxTs = 3

res, err = l.StaticValue(1)
require.NoError(t, err)
require.Equal(t, res.Postings[0].StartTs, uint64(1))

res, err = l.StaticValue(3)
require.NoError(t, err)
require.Equal(t, res.Postings[0].StartTs, uint64(3))

res, err = l.StaticValue(4)
require.NoError(t, err)
require.Equal(t, res.Postings[0].StartTs, uint64(3))

// Create txn from 4->6. It could be stored as 4 or 6 in the map.
l.mutationMap[4] = create_pl(6)
l.mutationMap[4].Postings[0].StartTs = 4
l.maxTs = 6

res, err = l.StaticValue(5)
require.NoError(t, err)
require.Equal(t, res.Postings[0].StartTs, uint64(3))

res, err = l.StaticValue(6)
require.NoError(t, err)
require.Equal(t, res.Postings[0].StartTs, uint64(4))
}

func TestAddMutation(t *testing.T) {
key := x.DataKey(x.GalaxyAttr("name"), 2)

Expand Down Expand Up @@ -450,6 +514,54 @@ func TestAddMutation_mrjn1(t *testing.T) {
require.Equal(t, 0, ol.Length(txn.StartTs, 0))
}

func TestReadSingleValue(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32

// We call pl.Iterate and then stop iterating in the first loop when we are reading
// single values. This test confirms that the two functions, getFirst from this file
// and GetSingeValueForKey works without an issue.

key := x.DataKey(x.GalaxyAttr("value"), 1240)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
N := int(10000)
for i := 2; i <= N; i += 2 {
edge := &pb.DirectedEdge{
Value: []byte("ho hey there" + strconv.Itoa(i)),
}
txn := Txn{StartTs: uint64(i)}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
kData := ol.getMutation(uint64(i))
writer := NewTxnWriter(pstore)
if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil {
require.NoError(t, err)
}
writer.Flush()

if i%10 == 0 {
// Do frequent rollups, and store data in old timestamp
kvs, err := ol.Rollup(nil, txn.StartTs-3)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}

j := 2
if j < int(ol.minTs) {
j = int(ol.minTs)
}
for ; j < i+6; j++ {
tx := NewTxn(uint64(j))
k, err := tx.cache.GetSinglePosting(key)
require.NoError(t, err)
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
}
}
}

func TestRollupMaxTsIsSet(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32
Expand Down
71 changes: 71 additions & 0 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,77 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
return lc.SetIfAbsent(skey, pl), nil
}

// GetSinglePosting retrieves the cached version of the first item in the list associated with the
// given key. This is used for retrieving the value of a scalar predicats.
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
// This would return an error if there is some data in the local cache, but we couldn't read it.
getListFromLocalCache := func() (*pb.PostingList, error) {
lc.RLock()

pl := &pb.PostingList{}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
err := pl.Unmarshal(delta)
lc.RUnlock()
return pl, err
}

l := lc.plists[string(key)]
lc.RUnlock()
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved

if l != nil {
return l.StaticValue(lc.startTs)
}

return nil, nil
}

getPostings := func() (*pb.PostingList, error) {
pl, err := getListFromLocalCache()
// If both pl and err are empty, that means that there was no data in local cache, hence we should
// read the data from badger.
if pl != nil || err != nil {
return pl, err
}

pl = &pb.PostingList{}
txn := pstore.NewTransactionAt(lc.startTs, false)
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
defer txn.Discard()

item, err := txn.Get(key)
if err != nil {
return nil, err
}

err = item.Value(func(val []byte) error {
return pl.Unmarshal(val)
})

return pl, err
}

pl, err := getPostings()
if err == badger.ErrKeyNotFound {
return nil, nil
}
if err != nil {
return nil, err
}

// Filter and remove STAR_ALL and OP_DELETE Postings
idx := 0
for _, postings := range pl.Postings {
if hasDeleteAll(postings) {
return nil, nil
}
if postings.Op != Del {
pl.Postings[idx] = postings
idx++
}
}
pl.Postings = pl.Postings[:idx]
return pl, nil
}

// Get retrieves the cached version of the list associated with the given key.
func (lc *LocalCache) Get(key []byte) (*List, error) {
return lc.getInternal(key, true)
Expand Down
104 changes: 69 additions & 35 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,14 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
outputs := make([]*pb.Result, numGo)
listType := schema.State().IsList(q.Attr)

// These are certain special cases where we can get away with reading only the latest value
// Lang doesn't work because we would be storing various different languages at various
// time. So when we go to read the latest value, we might get a different language.
// Similarly with DoCount and ExpandAll and Facets. List types are also not supported
// because list is stored by time, and we combine all the list items at various timestamps.
hasLang := schema.State().HasLang(q.Attr)
getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil

calculate := func(start, end int) error {
x.AssertTrue(start%width == 0)
out := &pb.Result{}
Expand All @@ -434,49 +442,75 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
key := x.DataKey(q.Attr, q.UidList.Uids[i])

// Get or create the posting list for an entity, attribute combination.
pl, err := qs.cache.Get(key)
if err != nil {
return err
}

// If count is being requested, there is no need to populate value and facets matrix.
if q.DoCount {
count, err := countForValuePostings(args, pl, facetsTree, listType)
if err != nil && err != posting.ErrNoValue {
var vals []types.Val
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored

if !getMultiplePosting {
pl, err := qs.cache.GetSinglePosting(key)
if err != nil {
return err
}
if pl == nil || len(pl.Postings) == 0 {
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
continue
}
vals = make([]types.Val, len(pl.Postings))
for i, p := range pl.Postings {
vals[i] = types.Val{
Tid: types.TypeID(p.ValType),
Value: p.Value,
}
}
} else {
pl, err := qs.cache.Get(key)
if err != nil {
return err
}
out.Counts = append(out.Counts, uint32(count))
// Add an empty UID list to make later processing consistent.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
continue
}

vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType)
switch {
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
// This branch is taken when the value does not exist in the pl or
// the number of values retrieved is zero (there could still be facets).
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
// LangMatrix so that all these data structure have predictable layouts.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
if q.ExpandAll {
// To keep the cardinality same as that of ValueMatrix.
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
// If count is being requested, there is no need to populate value and facets matrix.
if q.DoCount {
count, err := countForValuePostings(args, pl, facetsTree, listType)
if err != nil && err != posting.ErrNoValue {
return err
}
out.Counts = append(out.Counts, uint32(count))
// Add an empty UID list to make later processing consistent.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
continue
}
continue
case err != nil:
return err
}

if q.ExpandAll {
langTags, err := pl.GetLangTags(args.q.ReadTs)
if err != nil {
vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType)

switch {
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
// This branch is taken when the value does not exist in the pl or
// the number of values retrieved is zero (there could still be facets).
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
// LangMatrix so that all these data structure have predictable layouts.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
if q.ExpandAll {
// To keep the cardinality same as that of ValueMatrix.
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
}
continue
case err != nil:
return err
}
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})

if q.ExpandAll {
langTags, err := pl.GetLangTags(args.q.ReadTs)
if err != nil {
return err
}
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
}
}

uidList := new(pb.List)
Expand Down