-
Notifications
You must be signed in to change notification settings - Fork 9
/
resume.go
82 lines (76 loc) · 1.92 KB
/
resume.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Copyright Kuei-chun Chen, 2022-present. All rights reserved.
package hummingbird
import (
"fmt"
"log"
"github.com/simagix/gox"
)
// Resume resumes a migration
func Resume(filename string, extra ...bool) error {
var err error
var isData, isOplog bool
logger := gox.GetLogger()
inst, err := NewMigratorInstance(filename)
if err != nil {
return fmt.Errorf("NewMigratorInstance failed: %v", err)
}
ws := inst.Workspace()
status := fmt.Sprintf("resume a migration from %v", filename)
logger.Remark(status)
if err = ws.Log(status); err != nil {
return fmt.Errorf("update status failed: %v", err)
}
if inst.Command == CommandAll {
isData = true
isOplog = true
} else if inst.Command == CommandData || inst.Command == CommandDataOnly {
if inst.IsDrop {
return fmt.Errorf(`cannot use {"drop": true} with {"command": "%v"}`, inst.Command)
}
isData = true
if inst.Command == CommandData {
isOplog = true
}
} else {
return fmt.Errorf("unsupported command %v", inst.Command)
}
wg := gox.NewWaitGroup(4)
if len(extra) == 0 {
wg.Add(1)
go func(port int) {
defer wg.Done()
if err := StartWebServer(port); err != nil {
log.Fatalf("StartWebServer failed: %v", err)
}
}(inst.Port)
}
if isData {
if err = inst.CheckIfBalancersDisabled(); err != nil { // if balancer is running, exits
return fmt.Errorf("CheckIfBalancersDisabled failed: %v", err)
}
}
tasks, err := ws.FindAllParentTasks()
if err != nil {
return fmt.Errorf("FindAllParentTasks failed %v", err)
}
for _, task := range tasks {
if task.Status == TaskSplitting {
ws.ResetParentTask(*task)
}
}
ws.ResetProcessingTasks()
if isOplog {
if err = OplogStreamers(); err != nil {
return fmt.Errorf("OplogStreamers failed: %v", err)
}
}
if isData {
if err = DataCopier(); err != nil {
return fmt.Errorf("DataCopier failed: %v", err)
}
}
inst.NotifyWorkerExit()
inst.LiveStreamingOplogs()
wg.Wait()
return nil
}