Skip to content

Commit

Permalink
Fix Table operations with comp indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
jhchabran authored and asdine committed May 1, 2021
1 parent 56b18ab commit fffcc2a
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 33 deletions.
49 changes: 30 additions & 19 deletions database/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,18 @@ func (t *Table) Insert(d document.Document) (document.Document, error) {
indexes := t.Indexes()

for _, idx := range indexes {
vals := make([]document.Value, len(idx.Info.Paths))
vs := make([]document.Value, 0, len(idx.Info.Paths))

for i, path := range idx.Info.Paths {
for _, path := range idx.Info.Paths {
v, err := path.GetValueFromDocument(fb)
if err != nil {
v = document.NewNullValue()
}

vals[i] = v
vs = append(vs, v)
}

err = idx.Set(vals, key)
err = idx.Set(vs, key)
if err != nil {
if err == ErrIndexDuplicateValue {
return nil, ErrDuplicateDocument
Expand Down Expand Up @@ -142,15 +142,21 @@ func (t *Table) Delete(key []byte) error {
indexes := t.Indexes()

for _, idx := range indexes {
values := make([]document.Value, len(idx.Info.Paths))
for i, path := range idx.Info.Paths {
values[i], err = path.GetValueFromDocument(d)
vs := make([]document.Value, 0, len(idx.Info.Paths))
for _, path := range idx.Info.Paths {
v, err := path.GetValueFromDocument(d)
if err != nil {
return err
if err == document.ErrFieldNotFound {
v = document.NewNullValue()
} else {
return err
}
}

vs = append(vs, v)
}

err = idx.Delete(values, key)
err = idx.Delete(vs, key)
if err != nil {
return err
}
Expand Down Expand Up @@ -188,15 +194,16 @@ func (t *Table) replace(indexes []*Index, key []byte, d document.Document) error

// remove key from indexes
for _, idx := range indexes {
values := make([]document.Value, len(idx.Info.Paths))
for i, path := range idx.Info.Paths {
values[i], err = path.GetValueFromDocument(old)
vs := make([]document.Value, 0, len(idx.Info.Paths))
for _, path := range idx.Info.Paths {
v, err := path.GetValueFromDocument(old)
if err != nil {
values[i] = document.NewNullValue()
v = document.NewNullValue()
}
vs = append(vs, v)
}

err = idx.Delete(values, key)
err := idx.Delete(vs, key)
if err != nil {
return err
}
Expand All @@ -219,13 +226,17 @@ func (t *Table) replace(indexes []*Index, key []byte, d document.Document) error

// update indexes
for _, idx := range indexes {
// only support one path
v, err := idx.Info.Paths[0].GetValueFromDocument(d)
if err != nil {
v = document.NewNullValue()
vs := make([]document.Value, 0, len(idx.Info.Paths))
for _, path := range idx.Info.Paths {
v, err := path.GetValueFromDocument(d)
if err != nil {
v = document.NewNullValue()
}

vs = append(vs, v)
}

err = idx.Set([]document.Value{v}, key)
err = idx.Set(vs, key)
if err != nil {
if err == ErrIndexDuplicateValue {
return ErrDuplicateDocument
Expand Down
85 changes: 71 additions & 14 deletions database/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,18 +639,31 @@ func TestTableReplace(t *testing.T) {
_, tx, cleanup := newTestTx(t)
defer cleanup()

err := tx.CreateTable("test", nil)
err := tx.CreateTable("test1", nil)
require.NoError(t, err)

err = tx.CreateTable("test2", nil)
require.NoError(t, err)

// simple indexes
err = tx.CreateIndex(&database.IndexInfo{
Paths: []document.Path{document.NewPath("a")},
Unique: true,
TableName: "test",
TableName: "test1",
IndexName: "idx_foo_a",
})
require.NoError(t, err)

tb, err := tx.GetTable("test")
// composite indexes
err = tx.CreateIndex(&database.IndexInfo{
Paths: []document.Path{document.NewPath("x"), document.NewPath("y")},
Unique: true,
TableName: "test2",
IndexName: "idx_foo_x_y",
})
require.NoError(t, err)

tb, err := tx.GetTable("test1")
require.NoError(t, err)

// insert two different documents
Expand All @@ -659,26 +672,63 @@ func TestTableReplace(t *testing.T) {
d2, err := tb.Insert(testutil.MakeDocument(t, `{"a": 2, "b": 2}`))
require.NoError(t, err)

before := testutil.GetIndexContent(t, tx, "idx_foo_a")
beforeIdxA := testutil.GetIndexContent(t, tx, "idx_foo_a")

// replace doc 1 without modifying indexed key
// --- a
// replace d1 without modifying indexed key
err = tb.Replace(d1.(document.Keyer).RawKey(), testutil.MakeDocument(t, `{"a": 1, "b": 3}`))
require.NoError(t, err)
// index should be the same as before
require.Equal(t, before, testutil.GetIndexContent(t, tx, "idx_foo_a"))

// replace doc 2 and modify indexed key
// indexes should be the same as before
require.Equal(t, beforeIdxA, testutil.GetIndexContent(t, tx, "idx_foo_a"))

// replace d2 and modify indexed key
err = tb.Replace(d2.(document.Keyer).RawKey(), testutil.MakeDocument(t, `{"a": 3, "b": 3}`))
require.NoError(t, err)
// index should be different for doc 2

// indexes should be different for d2
got := testutil.GetIndexContent(t, tx, "idx_foo_a")
require.Equal(t, before[0], got[0])
require.NotEqual(t, before[1], got[1])
require.Equal(t, beforeIdxA[0], got[0])
require.NotEqual(t, beforeIdxA[1], got[1])

// replace doc 1 with duplicate indexed key
// replace d1 with duplicate indexed key
err = tb.Replace(d1.(document.Keyer).RawKey(), testutil.MakeDocument(t, `{"a": 3, "b": 3}`))

// index should be the same as before
require.Equal(t, database.ErrDuplicateDocument, err)

// --- x, y
tb, err = tx.GetTable("test2")
require.NoError(t, err)
// insert two different documents
dc1, err := tb.Insert(testutil.MakeDocument(t, `{"x": 1, "y": 1, "z": 1}`))
require.NoError(t, err)
dc2, err := tb.Insert(testutil.MakeDocument(t, `{"x": 2, "y": 2, "z": 2}`))
require.NoError(t, err)

beforeIdxXY := testutil.GetIndexContent(t, tx, "idx_foo_x_y")
// replace dc1 without modifying indexed key
err = tb.Replace(dc1.(document.Keyer).RawKey(), testutil.MakeDocument(t, `{"x": 1, "y": 1, "z": 2}`))
require.NoError(t, err)

// index should be the same as before
require.Equal(t, beforeIdxXY, testutil.GetIndexContent(t, tx, "idx_foo_x_y"))

// replace dc2 and modify indexed key
err = tb.Replace(dc2.(document.Keyer).RawKey(), testutil.MakeDocument(t, `{"x": 3, "y": 3, "z": 3}`))
require.NoError(t, err)

// indexes should be different for d2
got = testutil.GetIndexContent(t, tx, "idx_foo_x_y")
require.Equal(t, beforeIdxXY[0], got[0])
require.NotEqual(t, beforeIdxXY[1], got[1])

// replace dc2 with duplicate indexed key
err = tb.Replace(dc1.(document.Keyer).RawKey(), testutil.MakeDocument(t, `{"x": 3, "y": 3, "z": 3}`))

// index should be the same as before
require.Equal(t, database.ErrDuplicateDocument, err)

})
}

Expand Down Expand Up @@ -751,7 +801,14 @@ func TestTableIndexes(t *testing.T) {
require.NoError(t, err)
err = tx.CreateIndex(&database.IndexInfo{
Unique: false,
IndexName: "ifx2a",
IndexName: "idx1ab",
TableName: "test1",
Paths: []document.Path{parsePath(t, "a"), parsePath(t, "b")},
})
require.NoError(t, err)
err = tx.CreateIndex(&database.IndexInfo{
Unique: false,
IndexName: "idx2a",
TableName: "test2",
Paths: []document.Path{parsePath(t, "a")},
})
Expand All @@ -762,7 +819,7 @@ func TestTableIndexes(t *testing.T) {

m := tb.Indexes()
require.NoError(t, err)
require.Len(t, m, 2)
require.Len(t, m, 3)
})
}

Expand Down

0 comments on commit fffcc2a

Please sign in to comment.