-
Notifications
You must be signed in to change notification settings - Fork 17
/
ldb_rotating_reader_test.go
379 lines (343 loc) · 8.31 KB
/
ldb_rotating_reader_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
package ctlstore
import (
"context"
"database/sql"
"fmt"
"github.com/segmentio/ctlstore/pkg/ldb"
"github.com/stretchr/testify/require"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"
)
func getMultiDBs(t *testing.T, count int) (dbs []*sql.DB, paths []string) {
var tds []func()
for i := 0; i < count; i++ {
d, td, p := ldb.LDBForTestWithPath(t)
dbs = append(dbs, d)
tds = append(tds, td)
paths = append(paths, p)
}
t.Cleanup(func() {
for _, fn := range tds {
fn()
}
})
return dbs, paths
}
type basic struct {
x int32 `ctlstore:"x"`
}
func TestBasicRotatingReader(t *testing.T) {
dbs, paths := getMultiDBs(t, 2)
for i, db := range dbs {
_, err := db.Exec("CREATE TABLE family___table (x integer primary key);")
if err != nil {
t.Fatalf("failed to setup table: %v", err)
}
_, err = db.Exec(fmt.Sprintf("INSERT INTO family___table VALUES ('%d')", i+1))
if err != nil {
t.Fatalf("failed to insert into table: %v", err)
}
}
rr, err := CustomerRotatingReader(context.Background(), Every30, paths...)
if err != nil {
t.Fatalf("failed to create rotating reader: %v", err)
}
var out basic
reader := rr.(*LDBRotatingReader)
found, err := rr.GetRowByKey(context.Background(), &out, "family", "table", reader.active+1)
if err != nil || !found {
t.Errorf("failed to find key 1: %v", err)
}
require.Equal(t, reader.active+1, out.x)
var out2 basic
atomic.StoreInt32(&reader.active, (reader.active+1)%2)
found, err = reader.GetRowByKey(context.Background(), &out2, "family", "table", reader.active+1)
if err != nil || !found {
t.Errorf("failed to find key 2: %v", err)
}
require.Equal(t, reader.active+1, out2.x)
}
func TestValidRotatingReader(t *testing.T) {
tests := []struct {
name string
expErr string
paths []string
rp RotationPeriod
}{
{
"1 ldb",
"more than 1 ldb",
[]string{"1path"},
Every30,
},
{
"No ldb",
"more than 1 ldb",
[]string{},
Every30,
},
{
"Nil ldb",
"more than 1 ldb",
nil,
Every30,
},
{
"bad rotation",
"invalid rotation",
[]string{"path1", "path2"},
RotationPeriod(2),
},
{
"more ldbs than period, max",
"cannot have more",
[]string{"path1", "path2", "path3", "path4", "path5", "path6", "path7", "path8", "path9", "path10", "path11"},
Every6,
},
{
"more ldbs than period, min",
"cannot have more",
[]string{"path1", "path2", "path3"},
Every30,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := rotatingReader(tt.rp, tt.paths...)
if err == nil {
t.Fatal("error expected, none found")
}
if !strings.Contains(err.Error(), tt.expErr) {
t.Errorf("Did not find right error: got %v", err)
}
})
}
}
func TestRotation(t *testing.T) {
_, paths := getMultiDBs(t, 6)
rr, err := rotatingReader(Every6, paths...)
if err != nil {
t.Fatal("unexpected error creating reader")
}
tests := []struct {
name string
nowFunc func() time.Time
exp int
}{
{
"0-5",
func() time.Time {
return time.Date(2023, 8, 17, 9, 1, 0, 0, time.UTC)
},
0,
},
{
"6-11",
func() time.Time {
return time.Date(2023, 8, 17, 9, 8, 0, 0, time.UTC)
},
1,
},
{
"12-17",
func() time.Time {
return time.Date(2023, 8, 17, 9, 17, 0, 0, time.UTC)
},
2,
},
{
"18-23",
func() time.Time {
return time.Date(2023, 8, 17, 9, 21, 0, 0, time.UTC)
},
3,
},
{
"24-29",
func() time.Time {
return time.Date(2023, 8, 17, 9, 24, 0, 0, time.UTC)
},
4,
},
{
"30-35",
func() time.Time {
return time.Date(2023, 8, 17, 9, 32, 0, 0, time.UTC)
},
5,
},
{
"36-41",
func() time.Time {
return time.Date(2023, 8, 17, 9, 41, 0, 0, time.UTC)
},
0,
},
{
"42-47",
func() time.Time {
return time.Date(2023, 8, 17, 9, 42, 0, 0, time.UTC)
},
1,
},
{
"48-53",
func() time.Time {
return time.Date(2023, 8, 17, 9, 53, 0, 0, time.UTC)
},
2,
},
{
"54-59",
func() time.Time {
return time.Date(2023, 8, 17, 9, 59, 0, 0, time.UTC)
},
3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rr.now = tt.nowFunc
rr.setActive()
if rr.active != int32(tt.exp) {
t.Errorf("expected %d to be active, got %d instead", tt.exp, rr.active)
}
})
}
}
func TestMultipleReaders(t *testing.T) {
ctx := context.Background()
dbs, paths := getMultiDBs(t, 4)
// create the tables in each db, and add a row unique to that db
for i, db := range dbs {
_, err := db.Exec("CREATE TABLE family___foo (id varchar primary key );")
if err != nil {
t.Fatalf("failure creating table, %v", err)
}
_, err = db.Exec(fmt.Sprintf("INSERT INTO family___foo values ('%d');", i))
if err != nil {
t.Fatalf("failure inserting into table, %v", err)
}
}
rr, err := rotatingReader(Every15, paths...)
if err != nil {
t.Fatalf("unexpected error creating reader, %v", err)
}
i := 0
wait := make(chan interface{})
rr.now = func() time.Time {
defer func() {
if i != 0 {
wait <- 1
}
i = i + 15
}()
return time.Date(2023, 8, 17, 10, 0+i, 59, 999_999_999, time.UTC)
}
rr.tickerInterval = 1 * time.Millisecond
rr.setActive()
go rr.rotate(ctx)
for x := range dbs {
// for each db, ensure that we read its unique row
out := make(map[string]interface{})
val, err := rr.GetRowByKey(ctx, out, "family", "foo", x)
if err != nil || !val {
t.Errorf("unexpected error on GetRowByKey %v", err)
}
require.EqualValues(t, out, map[string]interface{}{"id": strconv.Itoa(x)}, "did not read correct value from table")
// also ensure we can't read any other unique rows from other dbs
for y := range dbs {
if y == x {
continue
}
val, err = rr.GetRowByKey(ctx, out, "family", "foo", y)
if val || err != nil {
t.Errorf("row with key %d should not be found", y)
}
}
// allow the ticker to proceed with its rotation
<-wait
time.Sleep(500 * time.Microsecond)
}
}
type kv struct {
id string `ctlstore:"id"`
bar string `ctlstore:"bar"`
}
// verifies that the rows cursor returned by GetRowsByKeyPrefix is still valid even if a rotation occurs while iterating over the row set
func TestGetRowByPrefixAfterRotation(t *testing.T) {
ctx := context.Background()
dbs, paths := getMultiDBs(t, 4)
// create the tables and multiple rows
for i, db := range dbs {
_, err := db.Exec("CREATE TABLE family___foo (id varchar, bar varchar, primary key (id, bar));")
if err != nil {
t.Fatalf("failure creating table, %v", err)
}
_, err = db.Exec(fmt.Sprintf("INSERT INTO family___foo values ('%d', '0'), ('%d', '1'), ('%d', '2'), ('%d', '3');", i, i, i, i))
if err != nil {
t.Fatalf("failure inserting into table, %v", err)
}
}
rr, err := rotatingReader(Every15, paths...)
if err != nil {
t.Fatalf("unexpected error creating reader, %v", err)
}
i := 0
wait := make(chan interface{})
rr.now = func() time.Time {
defer func() {
if i != 0 {
wait <- 1
}
i = i + 15
}()
return time.Date(2023, 8, 17, 10, (0+i)%60, 59, 999_999_999, time.UTC)
}
rr.tickerInterval = 1 * time.Millisecond
rr.setActive()
// get an active rows cursor for the results set from db 0
rows, err := rr.GetRowsByKeyPrefix(ctx, "family", "foo", "0")
go rr.rotate(ctx)
count := 0
for rows.Next() {
var tar kv
err := rows.Scan(&tar)
if err != nil {
t.Fatalf("scan error: %v", err)
}
require.Equal(t, "0", tar.id)
require.Equal(t, strconv.Itoa(count), tar.bar)
// trigger a rotation
<-wait
time.Sleep(500 * time.Microsecond)
var out kv
count++
// should rotate by now, check if different result set is returned
found, err := rr.GetRowByKey(ctx, &out, "family", "foo", "0", "0")
if count == 4 {
// on the 4th rotation, we're back at the beginning
require.EqualValues(t, kv{"0", "0"}, out, "should have rotated all the way back to the first reader")
} else if found || err != nil {
t.Errorf("should not have found the key since it rotated: %v", err)
}
}
require.Equal(t, 4, count, "should've returned 4 rows")
}
func TestPath(t *testing.T) {
paths := defaultPaths(5)
if len(paths) != 5 {
t.Fatal("should be 5 paths")
}
if paths[0] != defaultPath {
t.Fatalf("First path should be the default, %s", paths[0])
}
for i := 1; i < 5; i++ {
if !strings.Contains(paths[i], strconv.Itoa(i+1)) {
t.Errorf("path %s should've contained its number, %d", paths[i], i)
}
}
}