-
Notifications
You must be signed in to change notification settings - Fork 9
/
workspace.go
365 lines (341 loc) · 11 KB
/
workspace.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
// Copyright Kuei-chun Chen, 2022-present. All rights reserved.
package hummingbird
import (
"context"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"
"time"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/simagix/keyhole/mdb"
)
const (
// MetaDBName defines default meta database name
MetaDBName = "_neutrino"
// MetaLogs defines default meta oplogs collection name
MetaLogs = "logs"
// MetaOplogs defines default meta oplogs collection name
MetaOplogs = "oplogs"
// MetaTasks defines default meta tasks collection name
MetaTasks = "tasks"
)
// Workspace stores meta database
type Workspace struct {
dbName string
dbURI string
spool string
}
// DropMetaDB drops meta database
func (ws *Workspace) DropMetaDB() error {
if ws.dbURI == "" || ws.dbName == "" {
return fmt.Errorf("db %v is nil", ws.dbName)
}
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return fmt.Errorf("GetMongoClient failed: %v", err)
}
return client.Database(ws.dbName).Drop(context.Background())
}
// CleanUpWorkspace removes all cached file
func (ws *Workspace) CleanUpWorkspace() error {
if ws.spool == "" {
return fmt.Errorf("spool directory is not defined")
}
var err error
var filenames []string
filepath.WalkDir(ws.spool, func(s string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if strings.HasSuffix(d.Name(), GZippedBSONFileExt) {
filenames = append(filenames, s)
}
return nil
})
for _, filename := range filenames {
if err = os.Remove(filename); err != nil {
return fmt.Errorf("os.Remove failed: %v", err)
}
}
return nil
}
// Reset drops meta database and clean up workspace
func (ws *Workspace) Reset() error {
var err error
if err = ws.DropMetaDB(); err != nil {
return fmt.Errorf("DropMetaDB failed: %v", err)
}
if err = ws.CreateTaskIndexes(); err != nil {
return fmt.Errorf("CreateTaskIndexes failed: %v", err)
}
return ws.CleanUpWorkspace()
}
// LogConfig records configs
func (ws *Workspace) LogConfig() error {
inst := GetMigratorInstance()
ws.Log(fullVersion)
ws.Log("from " + RedactedURI(inst.Source))
client, err := GetMongoClient(inst.Source)
if err != nil {
return fmt.Errorf("GetMongoClient failed: %v", err)
}
ws.Log("from " + inst.sourceStats.GetClusterShortSummary(client))
ws.Log("to " + RedactedURI(inst.Target))
client, err = GetMongoClient(inst.Target)
if err != nil {
return fmt.Errorf("GetMongoClient failed: %v", err)
}
ws.Log("to " + inst.targetStats.GetClusterShortSummary(client))
return nil
}
// CreateTaskIndexes create indexes on tasks collection
func (ws *Workspace) CreateTaskIndexes() error {
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return fmt.Errorf("GetMongoClient failed: %v", err)
}
coll := client.Database(ws.dbName).Collection(MetaTasks)
indexView := coll.Indexes()
models := []mongo.IndexModel{}
models = append(models, mongo.IndexModel{Keys: bson.D{{"status", 1}, {"replica_set", 1}, {"_id", 1}}})
models = append(models, mongo.IndexModel{Keys: bson.D{{"replica_set", 1}, {"parent_id", 1}}})
_, err = indexView.CreateMany(context.Background(), models)
return err
}
// Log adds a status to status collection
func (ws *Workspace) Log(status string) error {
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return fmt.Errorf("GetMongoClient failed: %v", err)
}
doc := bson.M{"_id": time.Now(), "status": status}
_, err = client.Database(MetaDBName).Collection(MetaLogs).InsertOne(context.Background(), doc)
return err
}
// InsertTasks inserts tasks to database
func (ws *Workspace) InsertTasks(tasks []*Task) error {
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return fmt.Errorf("GetMongoClient failed: %v", err)
}
var docs []interface{}
for _, task := range tasks {
data, err := bson.Marshal(task)
if err != nil {
return fmt.Errorf("Marshal failed: %v", err)
}
var doc bson.D
if err = bson.Unmarshal(data, &doc); err != nil {
return fmt.Errorf("Unmarshal failed: %v", err)
}
docs = append(docs, doc)
}
_, err = client.Database(MetaDBName).Collection(MetaTasks).InsertMany(context.Background(), docs)
return err
}
// UpdateTask updates task
func (ws *Workspace) UpdateTask(task *Task) error {
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return fmt.Errorf("GetMongoClient failed: %v", err)
}
var ctx = context.Background()
var result *mongo.UpdateResult
coll := client.Database(MetaDBName).Collection(MetaTasks)
doc := bson.M{"status": task.Status, "source_counts": task.SourceCounts,
"begin_time": task.BeginTime, "end_time": task.EndTime, "updated_by": task.UpdatedBy}
if task.Status == TaskCompleted {
doc["inserted"] = task.Inserted
}
if result, err = coll.UpdateOne(ctx, bson.M{"_id": task.ID}, bson.M{"$set": doc}); err != nil {
return fmt.Errorf("UpdateOne failed: %v", err)
}
if result.MatchedCount == 0 {
return fmt.Errorf(`no matched task updated: "%v"`, task.ID)
}
if task.ParentID == nil || task.Inserted == 0 || task.Status != TaskCompleted { // no parent to update
return nil
}
if result, err = coll.UpdateOne(ctx, bson.M{"_id": task.ParentID},
bson.M{"$inc": bson.M{"inserted": task.Inserted}}); err != nil {
return fmt.Errorf("UpdateOne parent %v failed: %v", task.ParentID, err)
}
if result.MatchedCount == 0 {
return fmt.Errorf(`no matched parent task updated: "%v", %v`, task.ParentID, err)
}
return nil
}
// FindNextTaskAndUpdate returns task by replica a set name
func (ws *Workspace) FindNextTaskAndUpdate(replset string, updatedBy string, rev int) (*Task, error) {
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return nil, fmt.Errorf("GetMongoClient failed: %v", err)
}
var ctx = context.Background()
var task = Task{}
filter := bson.D{{"status", TaskAdded}, {"replica_set", replset}}
if replset == "" {
filter = bson.D{{"status", TaskAdded}}
}
coll := client.Database(MetaDBName).Collection(MetaTasks)
opts := options.FindOneAndUpdate()
opts.SetReturnDocument(options.After)
opts.SetSort(bson.D{{"replica_set", 1}, {"parent_id", rev}})
updates := bson.M{"$set": bson.M{"status": TaskProcessing, "begin_time": time.Now(), "updated_by": updatedBy}}
if err = coll.FindOneAndUpdate(ctx, filter, updates, opts).Decode(&task); err != nil {
return nil, err
}
return &task, nil
}
// CountAllStatus returns task
func (ws *Workspace) CountAllStatus() (TaskStatusCounts, error) {
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return TaskStatusCounts{}, fmt.Errorf("GetMongoClient failed: %v", err)
}
var counts TaskStatusCounts
ctx := context.Background()
pipeline := `[
{
"$sort": { "status": 1 }
}, {
"$group": {
"_id": "$status",
"count": { "$sum": 1 }
}
}
]`
coll := client.Database(MetaDBName).Collection(MetaTasks)
optsAgg := options.Aggregate().SetAllowDiskUse(true)
var cursor *mongo.Cursor
if cursor, err = coll.Aggregate(ctx, mdb.MongoPipeline(pipeline), optsAgg); err != nil {
return counts, err
}
defer cursor.Close(ctx)
for cursor.Next(ctx) {
var doc bson.M
if err = cursor.Decode(&doc); err != nil {
continue
}
if doc["_id"] == TaskAdded {
counts.Added = ToInt32(doc["count"])
} else if doc["_id"] == TaskCompleted {
counts.Completed = ToInt32(doc["count"])
} else if doc["_id"] == TaskFailed {
counts.Failed = ToInt32(doc["count"])
} else if doc["_id"] == TaskProcessing {
counts.Processing = ToInt32(doc["count"])
} else if doc["_id"] == TaskSplitting {
counts.Splitting = ToInt32(doc["count"])
}
}
return counts, err
}
// ResetLongRunningTasks resets long running processing to added
func (ws *Workspace) ResetLongRunningTasks(ago time.Duration) (int, error) {
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return 0, fmt.Errorf("GetMongoClient failed: %v", err)
}
if ago >= 0 {
return 0, fmt.Errorf("invlidate past time %v, should be negative", ago)
}
ctx := context.Background()
coll := client.Database(MetaDBName).Collection(MetaTasks)
updates := bson.M{"$set": bson.M{"status": TaskAdded, "begin_time": time.Time{}, "updated_by": "maid"}}
filter := bson.D{{"status", TaskProcessing}, {"begin_time", bson.M{"$lt": time.Now().Add(ago)}}}
result, err := coll.UpdateMany(ctx, filter, updates)
return int(result.ModifiedCount), err
}
// SaveOplogTimestamp updates timestamp of a shard/replica
func (ws *Workspace) SaveOplogTimestamp(setName string, ts primitive.Timestamp) error {
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return fmt.Errorf("GetMongoClient failed: %v", err)
}
filter := bson.M{"_id": setName}
update := bson.M{"$set": bson.M{"ts": ts}}
opts := options.Update()
opts.SetUpsert(true)
coll := client.Database(MetaDBName).Collection(MetaOplogs)
_, err = coll.UpdateOne(context.Background(), filter, update, opts)
return err
}
// GetOplogTimestamp returns timestamp of a shard/replica
func (ws *Workspace) GetOplogTimestamp(setName string) *primitive.Timestamp {
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return nil
}
filter := bson.M{"_id": setName}
coll := client.Database(MetaDBName).Collection(MetaOplogs)
var doc bson.M
opts := options.FindOne()
opts.SetProjection(bson.M{"ts": 1, "_id": 0})
if err = coll.FindOne(context.Background(), filter).Decode(&doc); err != nil {
return nil
}
ts, ok := doc["ts"].(primitive.Timestamp)
if ok {
return &ts
}
return nil
}
// FindAllParentTasks returns task by replica a set name
func (ws *Workspace) FindAllParentTasks() ([]*Task, error) {
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return nil, fmt.Errorf("GetMongoClient failed: %v", err)
}
ctx := context.Background()
var tasks = []*Task{}
filter := bson.D{{"parent_id", nil}}
coll := client.Database(MetaDBName).Collection(MetaTasks)
cursor, err := coll.Find(ctx, filter)
if err != nil {
return nil, fmt.Errorf("find %v failed: %v", filter, err)
}
for cursor.Next(ctx) {
var task Task
bson.Unmarshal(cursor.Current, &task)
tasks = append(tasks, &task)
}
return tasks, nil
}
// ResetParentTask resets and deletes all child tasks
func (ws *Workspace) ResetParentTask(task Task) error {
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return fmt.Errorf("GetMongoClient failed: %v", err)
}
ctx := context.Background()
coll := client.Database(MetaDBName).Collection(MetaTasks)
_, err = coll.UpdateOne(ctx, bson.M{"_id": task.ID}, bson.M{"$set": bson.M{"status": TaskAdded}})
if err != nil {
return fmt.Errorf("UpdateOne failed: %v", err)
}
_, err = coll.DeleteMany(ctx, bson.M{"parent_id": task.ID})
if err != nil {
return fmt.Errorf("DeleteMany failed: %v", err)
}
return nil
}
// ResetProcessingTasks resets processing status to added
func (ws *Workspace) ResetProcessingTasks() error {
client, err := GetMongoClient(ws.dbURI)
if err != nil {
return fmt.Errorf("GetMongoClient failed: %v", err)
}
ctx := context.Background()
coll := client.Database(MetaDBName).Collection(MetaTasks)
_, err = coll.UpdateMany(ctx, bson.M{"status": TaskProcessing}, bson.M{"$set": bson.M{"status": TaskAdded}})
if err != nil {
return fmt.Errorf("UpdateMany failed: %v", err)
}
return nil
}