From bb72cb941ca6f6c8ebbaa82842daa824fec54fd4 Mon Sep 17 00:00:00 2001 From: Harshil goel Date: Mon, 15 Jul 2024 22:10:37 +0000 Subject: [PATCH 1/4] Add single key call Currently namespace used by unique generator uses edge predicate to validate. This shouldn't be the case, we should use the namespace provided in context. --- posting/list.go | 52 ++++++++++++++++++++++ posting/list_test.go | 48 ++++++++++++++++++++ posting/lists.go | 74 ++++++++++++++++++++++++++++++ worker/task.go | 104 ++++++++++++++++++++++++++++--------------- 4 files changed, 243 insertions(+), 35 deletions(-) diff --git a/posting/list.go b/posting/list.go index 8c86562ad96..f98070a2776 100644 --- a/posting/list.go +++ b/posting/list.go @@ -1393,6 +1393,58 @@ func (l *List) GetLangTags(readTs uint64) ([]string, error) { hex.EncodeToString(l.key)) } +func (l *List) StaticValue(readTs uint64) (*pb.PostingList, error) { + l.RLock() + defer l.RUnlock() + + return l.StaticValueWithLockHeld(readTs) +} + +func (l *List) StaticValueWithLockHeld(readTs uint64) (*pb.PostingList, error) { + val, found, err := l.findStaticValue(readTs, math.MaxUint64) + if err != nil { + return val, errors.Wrapf(err, + "cannot retrieve default value from list with key %s", hex.EncodeToString(l.key)) + } + if !found { + return val, ErrNoValue + } + return val, nil +} + +func (l *List) findStaticValue(readTs, uid uint64) (*pb.PostingList, bool, error) { + l.AssertRLock() + + mutation, ok := l.mutationMap[readTs] + if ok { + return mutation, true, nil + } + + if l.maxTs < readTs { + mutation, ok = l.mutationMap[l.maxTs] + if ok { + return mutation, true, nil + } + } + + if len(l.mutationMap) != 0 { + for ts, mutation_i := range l.mutationMap { + if ts <= readTs { + mutation = mutation_i + } else { + break + } + } + return mutation, true, nil + } + + if len(l.plist.Postings) > 0 { + return l.plist, true, nil + } + + return nil, false, nil +} + // Value returns the default value from the posting list. The default value is // defined as the value without a language tag. // Value cannot be used to read from cache diff --git a/posting/list_test.go b/posting/list_test.go index 45c0d963262..51dbae57624 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -450,6 +450,54 @@ func TestAddMutation_mrjn1(t *testing.T) { require.Equal(t, 0, ol.Length(txn.StartTs, 0)) } +func TestReadSingleValue(t *testing.T) { + defer setMaxListSize(maxListSize) + maxListSize = math.MaxInt32 + + // We call pl.Iterate and then stop iterating in the first loop when we are reading + // single values. This test confirms that the two functions, getFirst from this file + // and GetSingeValueForKey works without an issue. + + key := x.DataKey(x.GalaxyAttr("value"), 1240) + ol, err := getNew(key, ps, math.MaxUint64) + require.NoError(t, err) + N := int(10000) + for i := 2; i <= N; i += 2 { + edge := &pb.DirectedEdge{ + Value: []byte("ho hey there" + strconv.Itoa(i)), + } + txn := Txn{StartTs: uint64(i)} + addMutationHelper(t, ol, edge, Set, &txn) + require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1)) + kData := ol.getMutation(uint64(i)) + writer := NewTxnWriter(pstore) + if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil { + require.NoError(t, err) + } + writer.Flush() + + if i%10 == 0 { + // Do frequent rollups, and store data in old timestamp + kvs, err := ol.Rollup(nil, txn.StartTs-3) + require.NoError(t, err) + require.NoError(t, writePostingListToDisk(kvs)) + ol, err = getNew(key, ps, math.MaxUint64) + require.NoError(t, err) + } + + j := 2 + if j < int(ol.minTs) { + j = int(ol.minTs) + } + for ; j < i+6; j++ { + tx := NewTxn(uint64(j)) + k, err := tx.cache.GetSinglePosting(key) + require.NoError(t, err) + checkValue(t, ol, string(k.Postings[0].Value), uint64(j)) + } + } +} + func TestRollupMaxTsIsSet(t *testing.T) { defer setMaxListSize(maxListSize) maxListSize = math.MaxInt32 diff --git a/posting/lists.go b/posting/lists.go index b1abd569cf7..addfe8cb556 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -322,6 +322,80 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) return lc.SetIfAbsent(skey, pl), nil } +// GetSinglePosting retrieves the cached version of the first item in the list associated with the +// given key. This is used for retrieving the value of a scalar predicats. +func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { + getList := func() *pb.PostingList { + lc.RLock() + + pl := &pb.PostingList{} + if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + err := pl.Unmarshal(delta) + if err == nil { + lc.RUnlock() + return pl + } + } + + l := lc.plists[string(key)] + lc.RUnlock() + + if l != nil { + pl, err := l.StaticValue(lc.startTs) + if err != nil { + return pl + } + } + + return nil + } + + getPostings := func() (*pb.PostingList, error) { + pl := getList() + if pl != nil { + return pl, nil + } + + pl = &pb.PostingList{} + txn := pstore.NewTransactionAt(lc.startTs, false) + item, err := txn.Get(key) + if err != nil { + return nil, err + } + + err = item.Value(func(val []byte) error { + if err := pl.Unmarshal(val); err != nil { + return err + } + return nil + }) + + return pl, err + } + + pl, err := getPostings() + if err == badger.ErrKeyNotFound { + return nil, nil + } + if err != nil { + return nil, err + } + + // Filter and remove STAR_ALL and OP_DELETE Postings + idx := 0 + for _, postings := range pl.Postings { + if hasDeleteAll(postings) { + return nil, nil + } + if postings.Op != Del { + pl.Postings[idx] = postings + idx++ + } + } + pl.Postings = pl.Postings[:idx] + return pl, nil +} + // Get retrieves the cached version of the list associated with the given key. func (lc *LocalCache) Get(key []byte) (*List, error) { return lc.getInternal(key, true) diff --git a/worker/task.go b/worker/task.go index a8969d02ec0..91e1b28a884 100644 --- a/worker/task.go +++ b/worker/task.go @@ -420,6 +420,14 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er outputs := make([]*pb.Result, numGo) listType := schema.State().IsList(q.Attr) + // These are certain special cases where we can get away with reading only the latest value + // Lang doesn't work because we would be storing various different languages at various + // time. So when we go to read the latest value, we might get a different language. + // Similarly with DoCount and ExpandAll and Facets. List types are also not supported + // because list is stored by time, and we combine all the list items at various timestamps. + hasLang := schema.State().HasLang(q.Attr) + getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil + calculate := func(start, end int) error { x.AssertTrue(start%width == 0) out := &pb.Result{} @@ -434,49 +442,75 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er key := x.DataKey(q.Attr, q.UidList.Uids[i]) // Get or create the posting list for an entity, attribute combination. - pl, err := qs.cache.Get(key) - if err != nil { - return err - } - // If count is being requested, there is no need to populate value and facets matrix. - if q.DoCount { - count, err := countForValuePostings(args, pl, facetsTree, listType) - if err != nil && err != posting.ErrNoValue { + var vals []types.Val + fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored + + if !getMultiplePosting { + pl, err := qs.cache.GetSinglePosting(key) + if err != nil { + return err + } + if pl == nil || len(pl.Postings) == 0 { + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) + out.ValueMatrix = append(out.ValueMatrix, + &pb.ValueList{Values: []*pb.TaskValue{}}) + continue + } + vals = make([]types.Val, len(pl.Postings)) + for i, p := range pl.Postings { + vals[i] = types.Val{ + Tid: types.TypeID(p.ValType), + Value: p.Value, + } + } + } else { + pl, err := qs.cache.Get(key) + if err != nil { return err } - out.Counts = append(out.Counts, uint32(count)) - // Add an empty UID list to make later processing consistent. - out.UidMatrix = append(out.UidMatrix, &pb.List{}) - continue - } - vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType) - switch { - case err == posting.ErrNoValue || (err == nil && len(vals) == 0): - // This branch is taken when the value does not exist in the pl or - // the number of values retrieved is zero (there could still be facets). - // We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and - // LangMatrix so that all these data structure have predictable layouts. - out.UidMatrix = append(out.UidMatrix, &pb.List{}) - out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) - out.ValueMatrix = append(out.ValueMatrix, - &pb.ValueList{Values: []*pb.TaskValue{}}) - if q.ExpandAll { - // To keep the cardinality same as that of ValueMatrix. - out.LangMatrix = append(out.LangMatrix, &pb.LangList{}) + // If count is being requested, there is no need to populate value and facets matrix. + if q.DoCount { + count, err := countForValuePostings(args, pl, facetsTree, listType) + if err != nil && err != posting.ErrNoValue { + return err + } + out.Counts = append(out.Counts, uint32(count)) + // Add an empty UID list to make later processing consistent. + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + continue } - continue - case err != nil: - return err - } - if q.ExpandAll { - langTags, err := pl.GetLangTags(args.q.ReadTs) - if err != nil { + vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType) + + switch { + case err == posting.ErrNoValue || (err == nil && len(vals) == 0): + // This branch is taken when the value does not exist in the pl or + // the number of values retrieved is zero (there could still be facets). + // We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and + // LangMatrix so that all these data structure have predictable layouts. + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) + out.ValueMatrix = append(out.ValueMatrix, + &pb.ValueList{Values: []*pb.TaskValue{}}) + if q.ExpandAll { + // To keep the cardinality same as that of ValueMatrix. + out.LangMatrix = append(out.LangMatrix, &pb.LangList{}) + } + continue + case err != nil: return err } - out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags}) + + if q.ExpandAll { + langTags, err := pl.GetLangTags(args.q.ReadTs) + if err != nil { + return err + } + out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags}) + } } uidList := new(pb.List) From 5a037f97b9f78f04f5ae1b04074fbedea2a57922 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Sat, 10 Aug 2024 02:08:29 +0530 Subject: [PATCH 2/4] fixed comments --- posting/list.go | 53 ++++++++++++++++++++++++------------------------ posting/lists.go | 31 +++++++++++++--------------- 2 files changed, 40 insertions(+), 44 deletions(-) diff --git a/posting/list.go b/posting/list.go index f98070a2776..eb6362919d9 100644 --- a/posting/list.go +++ b/posting/list.go @@ -1397,52 +1397,51 @@ func (l *List) StaticValue(readTs uint64) (*pb.PostingList, error) { l.RLock() defer l.RUnlock() - return l.StaticValueWithLockHeld(readTs) + return l.findStaticValue(readTs), nil } -func (l *List) StaticValueWithLockHeld(readTs uint64) (*pb.PostingList, error) { - val, found, err := l.findStaticValue(readTs, math.MaxUint64) - if err != nil { - return val, errors.Wrapf(err, - "cannot retrieve default value from list with key %s", hex.EncodeToString(l.key)) - } - if !found { - return val, ErrNoValue - } - return val, nil -} - -func (l *List) findStaticValue(readTs, uid uint64) (*pb.PostingList, bool, error) { +func (l *List) findStaticValue(readTs uint64) *pb.PostingList { l.AssertRLock() + if l.mutationMap == nil { + // If mutation map is empty, check if there is some data, and return it. + if l.plist != nil && len(l.plist.Postings) > 0 { + return l.plist + } + return nil + } + + // Return readTs is if it's present in the mutation. It's going to be the latest value. mutation, ok := l.mutationMap[readTs] if ok { - return mutation, true, nil + return mutation } + // If maxTs < readTs then we need to read maxTs if l.maxTs < readTs { mutation, ok = l.mutationMap[l.maxTs] if ok { - return mutation, true, nil + return mutation } } - if len(l.mutationMap) != 0 { - for ts, mutation_i := range l.mutationMap { - if ts <= readTs { - mutation = mutation_i - } else { - break - } + // This means that maxTs > readTs. Go through the map to find the closest value to readTs + mutation = nil + ts_found := uint64(0) + for ts, mutation_i := range l.mutationMap { + if ts <= readTs && ts > ts_found { + ts_found = ts + mutation = mutation_i } - return mutation, true, nil } - if len(l.plist.Postings) > 0 { - return l.plist, true, nil + if mutation != nil { + return mutation } - return nil, false, nil + // If we reach here, that means that there was no entry in mutation map which is less than readTs. That + // means we need to return l.plist + return l.plist } // Value returns the default value from the posting list. The default value is diff --git a/posting/lists.go b/posting/lists.go index addfe8cb556..8ca5ea427f9 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -325,49 +325,46 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) // GetSinglePosting retrieves the cached version of the first item in the list associated with the // given key. This is used for retrieving the value of a scalar predicats. func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { - getList := func() *pb.PostingList { + // This would return an error if there is some data in the local cache, but we couldn't read it. + getListFromLocalCache := func() (*pb.PostingList, error) { lc.RLock() pl := &pb.PostingList{} if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { err := pl.Unmarshal(delta) - if err == nil { - lc.RUnlock() - return pl - } + lc.RUnlock() + return pl, err } l := lc.plists[string(key)] lc.RUnlock() if l != nil { - pl, err := l.StaticValue(lc.startTs) - if err != nil { - return pl - } + return l.StaticValue(lc.startTs) } - return nil + return nil, nil } getPostings := func() (*pb.PostingList, error) { - pl := getList() - if pl != nil { - return pl, nil + pl, err := getListFromLocalCache() + // If both pl and err are empty, that means that there was no data in local cache, hence we should + // read the data from badger. + if pl != nil || err != nil { + return pl, err } pl = &pb.PostingList{} txn := pstore.NewTransactionAt(lc.startTs, false) + defer txn.Discard() + item, err := txn.Get(key) if err != nil { return nil, err } err = item.Value(func(val []byte) error { - if err := pl.Unmarshal(val); err != nil { - return err - } - return nil + return pl.Unmarshal(val) }) return pl, err From 6642a76c63470de69512c59e6b3fcb7fa8c2a784 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 13 Aug 2024 00:42:58 +0530 Subject: [PATCH 3/4] test --- posting/list_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/posting/list_test.go b/posting/list_test.go index 51dbae57624..1f214b558ab 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -124,6 +124,53 @@ func (l *List) commitMutation(startTs, commitTs uint64) error { return nil } +func TestGetSinglePosting(t *testing.T) { + key := x.DataKey(x.GalaxyAttr("GetSinglePosting"), 123) + txn := NewTxn(5) + l, err := txn.Get(key) + require.NoError(t, err) + + create_pl := func(startTs uint64) *pb.PostingList { + return &pb.PostingList{ + Postings: []*pb.Posting{{ + Uid: 1, + Op: 1, + StartTs: startTs, + }}, + } + } + + res, err := l.StaticValue(1) + require.NoError(t, err) + require.Equal(t, res == nil, true) + + l.plist = create_pl(1) + + res, err = l.StaticValue(1) + require.NoError(t, err) + require.Equal(t, res.Postings[0].StartTs, uint64(1)) + + res, err = l.StaticValue(4) + require.NoError(t, err) + require.Equal(t, res.Postings[0].StartTs, uint64(1)) + + l.mutationMap = make(map[uint64]*pb.PostingList) + l.mutationMap[3] = create_pl(3) + l.maxTs = 3 + + res, err = l.StaticValue(1) + require.NoError(t, err) + require.Equal(t, res.Postings[0].StartTs, uint64(1)) + + res, err = l.StaticValue(3) + require.NoError(t, err) + require.Equal(t, res.Postings[0].StartTs, uint64(3)) + + res, err = l.StaticValue(4) + require.NoError(t, err) + require.Equal(t, res.Postings[0].StartTs, uint64(3)) +} + func TestAddMutation(t *testing.T) { key := x.DataKey(x.GalaxyAttr("name"), 2) From f2443ca7d31ee9cc7516386bc2f75f7c6f8b720d Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 13 Aug 2024 03:01:46 +0530 Subject: [PATCH 4/4] fixed bug --- posting/list.go | 3 ++- posting/list_test.go | 27 ++++++++++++++++++++++----- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/posting/list.go b/posting/list.go index eb6362919d9..fe5acde126f 100644 --- a/posting/list.go +++ b/posting/list.go @@ -1428,7 +1428,8 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList { // This means that maxTs > readTs. Go through the map to find the closest value to readTs mutation = nil ts_found := uint64(0) - for ts, mutation_i := range l.mutationMap { + for _, mutation_i := range l.mutationMap { + ts := mutation_i.CommitTs if ts <= readTs && ts > ts_found { ts_found = ts mutation = mutation_i diff --git a/posting/list_test.go b/posting/list_test.go index 1f214b558ab..233e63455cf 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -132,11 +132,15 @@ func TestGetSinglePosting(t *testing.T) { create_pl := func(startTs uint64) *pb.PostingList { return &pb.PostingList{ - Postings: []*pb.Posting{{ - Uid: 1, - Op: 1, - StartTs: startTs, - }}, + Postings: []*pb.Posting{ + { + Uid: 1, + Op: 1, + StartTs: startTs, + CommitTs: startTs, + }, + }, + CommitTs: startTs, } } @@ -169,6 +173,19 @@ func TestGetSinglePosting(t *testing.T) { res, err = l.StaticValue(4) require.NoError(t, err) require.Equal(t, res.Postings[0].StartTs, uint64(3)) + + // Create txn from 4->6. It could be stored as 4 or 6 in the map. + l.mutationMap[4] = create_pl(6) + l.mutationMap[4].Postings[0].StartTs = 4 + l.maxTs = 6 + + res, err = l.StaticValue(5) + require.NoError(t, err) + require.Equal(t, res.Postings[0].StartTs, uint64(3)) + + res, err = l.StaticValue(6) + require.NoError(t, err) + require.Equal(t, res.Postings[0].StartTs, uint64(4)) } func TestAddMutation(t *testing.T) {