-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
283 lines (246 loc) · 8.07 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
package main
import (
"context"
"crypto/md5"
"fmt"
"io"
"log"
"os"
"strconv"
"sync"
"time"
"cloud.google.com/go/storage"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/cheggaaa/pb/v3"
"github.com/golang/snappy"
"github.com/joho/godotenv"
"golang.org/x/sync/semaphore"
"google.golang.org/api/option"
)
// S3設定
type s3ConfigStruct struct {
Region string
EndPoint string
AccessKey string
SecretKey string
ForcePathStyle bool
Bucket string
}
var s3Config s3ConfigStruct
// GCP設定
type gcpConfigStruct struct {
CredentialsPath string
ProjectID string
Region string
BucketNameSuffix string
}
var gcpConfig gcpConfigStruct
// Webhook設定
var webhookUrl string
var webhookId string
var webhookSecret string
// 並列ダウンロード数
var palalellNum int64 = 5
// フルバックアップかどうか
var fullBackup bool = false
func init() {
// 環境変数の読み込み
err := godotenv.Load(".env")
if err != nil {
log.Fatal("Error: Failed to load .env file")
}
s3Config.EndPoint = os.Getenv("S3_ENDPOINT")
s3Config.Region = os.Getenv("S3_REGION")
s3Config.AccessKey = os.Getenv("S3_ACCESS_KEY")
s3Config.SecretKey = os.Getenv("S3_SECRET_KEY")
s3Config.ForcePathStyle = os.Getenv("S3_FORCE_PATH_STYLE") == "true"
s3Config.Bucket = os.Getenv("S3_BUCKET")
gcpConfig.CredentialsPath = os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
gcpConfig.ProjectID = os.Getenv("GCP_PROJECT_ID")
gcpConfig.Region = os.Getenv("GCS_REGION")
gcpConfig.BucketNameSuffix = os.Getenv("GCS_BUCKET_NAME_SUFFIX")
webhookUrl = os.Getenv("WEBHOOK_URL")
webhookId = os.Getenv("WEBHOOK_ID")
webhookSecret = os.Getenv("WEBHOOK_SECRET")
palalellNum, err = strconv.ParseInt(os.Getenv("PALALELL_NUM"), 10, 64)
if err != nil {
log.Fatalf("Error: Failed to convert PALALELL_NUM to int: %v", err)
}
fullBackup = os.Getenv("FULL_BACKUP") == "true"
}
func main() {
// S3クライアントの作成
s3Credential := credentials.NewStaticCredentialsProvider(s3Config.AccessKey, s3Config.SecretKey, "")
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithCredentialsProvider(s3Credential),
config.WithRegion(s3Config.Region),
)
if err != nil {
log.Fatalf("Error: Failed to load configuration: %v", err)
}
s3Client := s3.NewFromConfig(cfg, func(opt *s3.Options) {
opt.UsePathStyle = s3Config.ForcePathStyle
opt.BaseEndpoint = aws.String(s3Config.EndPoint)
})
// GCSクライアントの作成
ctx := context.Background()
gcsClient, err := storage.NewClient(ctx, option.WithCredentialsFile(gcpConfig.CredentialsPath))
if err != nil {
log.Fatalf("Error: Failed to create GCS client: %v", err)
}
// バックアップ用GCSバケット作成
fmt.Println("Target buckets:")
gcsBucketName := s3Config.Bucket + gcpConfig.BucketNameSuffix
gcsBucketClient := gcsClient.Bucket(gcsBucketName)
gcsBucketAttr, err := gcsBucketClient.Attrs(ctx)
// バケットが存在しない場合は作成
if err == storage.ErrBucketNotExist {
gcsNewBucketAttr := storage.BucketAttrs{
StorageClass: "COLDLINE",
Location: gcpConfig.Region,
VersioningEnabled: true,
// 90日でデータ削除
Lifecycle: storage.Lifecycle{Rules: []storage.LifecycleRule{
{
Action: storage.LifecycleAction{Type: "Delete"},
Condition: storage.LifecycleCondition{AgeInDays: 90},
},
}},
}
if err := gcsBucketClient.Create(ctx, gcpConfig.ProjectID, &gcsNewBucketAttr); err != nil {
log.Fatalf("Error: Failed to create GCS bucket: %v", err)
} else {
fmt.Printf(" - %v -> %v(Created)\n", s3Config.Bucket, gcsBucketName)
}
} else if err != nil {
// その他のエラー
log.Fatalf("Error: Failed to get GCS bucket attributes: %v", err)
} else {
// 既に存在している場合、バケットの状態を確認
if gcsBucketAttr.StorageClass != "COLDLINE" {
log.Fatalf("Error: Bucket storage class is not COLDLINE: %v", gcsBucketAttr.StorageClass)
}
if !gcsBucketAttr.VersioningEnabled {
log.Fatalf("Error: Bucket versioning is not enabled")
}
fmt.Printf(" - %v -> %v(Already exists)\n", s3Config.Bucket, gcsBucketName)
}
// 改行
fmt.Println()
// バックアップ計測用変数
backupStartTime := time.Now()
totalObjects := 0
skippedObjects := 0
totalErrors := 0
executionLimit := semaphore.NewWeighted(palalellNum)
// バックアップ
fmt.Printf("Bucking up objects in %v to %v\n", s3Config.Bucket, gcsBucketName)
// オブジェクトのページネーターを作成
objectPaginator := s3.NewListObjectsV2Paginator(s3Client, &s3.ListObjectsV2Input{
Bucket: aws.String(s3Config.Bucket),
})
// 並列処理用
var wg sync.WaitGroup
// 各オブジェクトについて、エラーを格納する
var errs []error
// 並列処理開始
for {
if !objectPaginator.HasMorePages() {
break
}
// オブジェクト取得
page, err := objectPaginator.NextPage(ctx)
if err != nil {
log.Fatalf("Error: Failed to list objects: %v", err)
}
// プログレスバー
bar := pb.StartNew(len(page.Contents))
for _, object := range page.Contents {
// 並列処理数を制限
wg.Add(1)
executionLimit.Acquire(ctx, 1)
// オブジェクト数をカウント
totalObjects++
go func() {
defer executionLimit.Release(1)
defer wg.Done()
errCh := make(chan error, 1)
go func() {
// S3オブジェクトのダウンロード
s3ObjectOutput, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s3Config.Bucket),
Key: object.Key,
})
if err != nil {
errCh <- err
return
}
// フルバックアップでない場合、GCSオブジェクトとハッシュを比較
if !fullBackup {
// GCSオブジェクトの存在判定、情報取得
gcsObjectAttrs, err := gcsBucketClient.Object(*object.Key).Attrs(ctx)
// オブジェクトが存在する場合、ハッシュを比較
if err == nil {
s3Hash := md5.New()
// ハッシュ計算
hashWriter := snappy.NewBufferedWriter(s3Hash)
defer hashWriter.Close()
if _, err := io.Copy(hashWriter, s3ObjectOutput.Body); err != nil {
errCh <- err
return
}
hashWriter.Flush()
// ハッシュを比較し、同じだったらスキップ
if fmt.Sprintf("%x", gcsObjectAttrs.MD5) == fmt.Sprintf("%x", s3Hash.Sum(nil)) {
skippedObjects++
errCh <- nil
return
}
}
}
// GCS書き込み用オブジェクト作成
gcsObjectWriter := gcsBucketClient.Object(*object.Key).NewWriter(ctx)
// Snappy圧縮してGCSにアップロード
snappyWriter := snappy.NewBufferedWriter(gcsObjectWriter)
defer snappyWriter.Close()
if _, err := io.Copy(snappyWriter, s3ObjectOutput.Body); err != nil {
errCh <- err
return
}
snappyWriter.Flush()
if err := gcsObjectWriter.Close(); err != nil {
errCh <- err
return
}
errCh <- nil
}()
if err := <-errCh; err != nil {
log.Printf("Error: Failed to backup object %v: %v", *object.Key, err)
errs = append(errs, err)
}
}()
bar.Increment()
}
bar.Finish()
wg.Wait()
}
// エラー数をカウント
totalErrors += len(errs)
// バックアップ終了
backupEndTime := time.Now()
backupDuration := backupEndTime.Sub(backupStartTime)
fmt.Printf("Backup completed: %d objects, %d skipped, %d errors, %v\n", totalObjects, skippedObjects, totalErrors, backupDuration)
// Webhook送信
webhookMessage := fmt.Sprintf(`### オブジェクトストレージのバックアップが保存されました
S3バケット: %s
バックアップ開始時刻: %s
バックアップ所要時間: %f時間
オブジェクト数: %d
スキップされたオブジェクト数: %d
エラー数: %d
`, s3Config.Bucket, backupStartTime.Format("2006/01/02 15:04:05"), backupDuration.Hours(), totalObjects, skippedObjects, totalErrors)
postWebhook(webhookMessage, webhookUrl, webhookId, webhookSecret)
}