diff --git a/cmd/mongo/oplog_push.go b/cmd/mongo/oplog_push.go index 6f699e160..c9b0fc675 100644 --- a/cmd/mongo/oplog_push.go +++ b/cmd/mongo/oplog_push.go @@ -199,7 +199,7 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) { return } - return + return args, nil } type oplogPushStatsArgs struct { diff --git a/cmd/mongo/oplog_replay.go b/cmd/mongo/oplog_replay.go index c2a14deca..dafe5a248 100644 --- a/cmd/mongo/oplog_replay.go +++ b/cmd/mongo/oplog_replay.go @@ -96,24 +96,24 @@ func buildOplogReplayRunArgs(cmdargs []string) (args oplogReplayRunArgs, err err return args, nil } -func processArg(arg string, downloader *archive.StorageDownloader) (models.Timestamp, error) { - switch arg { - case internal.LatestString: - return downloader.LastKnownArchiveTS() - case LatestBackupString: - lastBackupName, err := downloader.LastBackupName() - if err != nil { - return models.Timestamp{}, err - } - backupMeta, err := downloader.BackupMeta(lastBackupName) - if err != nil { - return models.Timestamp{}, err - } - return models.TimestampFromBson(backupMeta.MongoMeta.BackupLastTS), nil - default: - return models.TimestampFromStr(arg) - } -} +//func processArg(arg string, downloader *archive.StorageDownloader) (models.Timestamp, error) { +// switch arg { +// case internal.LatestString: +// return downloader.LastKnownArchiveTS() +// case LatestBackupString: +// lastBackupName, err := downloader.LastBackupName() +// if err != nil { +// return models.Timestamp{}, err +// } +// backupMeta, err := downloader.BackupMeta(lastBackupName) +// if err != nil { +// return models.Timestamp{}, err +// } +// return models.TimestampFromBson(backupMeta.MongoMeta.BackupLastTS), nil +// default: +// return models.TimestampFromStr(arg) +// } +//} func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error { tracelog.DebugLogger.Printf("starting replay with arguments: %+v", replayArgs) @@ -138,8 +138,8 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error { return err } - filterList := shake.OplogFilterChain{new(shake.AutologousFilter), new(shake.NoopFilter), new(shake.DDLFilter)} - dbApplier := oplog.NewDBApplier(mongoClient, false, replayArgs.ignoreErrCodes, replayArgs.dbNode, filterList) + filterList := shake.OplogFilterChain{new(shake.AutologousFilter), new(shake.NoopFilter)} + dbApplier := oplog.NewDBApplier(mongoClient, true, replayArgs.ignoreErrCodes, replayArgs.dbNode, filterList) oplogApplier := stages.NewGenericApplier(dbApplier) // set up storage downloader client diff --git a/internal/databases/mongo/archive/loader.go b/internal/databases/mongo/archive/loader.go index 2f5cd4dee..722254470 100644 --- a/internal/databases/mongo/archive/loader.go +++ b/internal/databases/mongo/archive/loader.go @@ -132,7 +132,8 @@ func (sd *StorageDownloader) LastBackupName() (string, error) { // DownloadOplogArchive downloads, decompresses and decrypts (if needed) oplog archive. func (sd *StorageDownloader) DownloadOplogArchive(arch models.Archive, writeCloser io.WriteCloser) error { - return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), arch.DBNodeSpecificFileName(sd.GetNodeSpecificDownloader()), arch.Extension(), writeCloser) + return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), + arch.DBNodeSpecificFileName(sd.GetNodeSpecificDownloader()), arch.Extension(), writeCloser) } // ListOplogArchives fetches all oplog archives existed in storage. @@ -323,7 +324,8 @@ func (su *StorageUploader) UploadGapArchive(archErr error, firstTS, lastTS model return fmt.Errorf("can not build archive: %w", err) } - if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), arch.DBNodeSpecificFileName(su.dbNode)); err != nil { + if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), + arch.DBNodeSpecificFileName(su.dbNode)); err != nil { return fmt.Errorf("error while uploading stream: %w", err) } return nil diff --git a/internal/databases/mongo/client/client.go b/internal/databases/mongo/client/client.go index 9bec7774e..9ffc589fc 100644 --- a/internal/databases/mongo/client/client.go +++ b/internal/databases/mongo/client/client.go @@ -350,10 +350,6 @@ func (mc *MongoClient) ApplyOp(ctx context.Context, dbop db.Oplog) error { cmd[0] = bson.E{Key: "applyOps", Value: []interface{}{op}} apply := mc.c.Database("admin").RunCommand(ctx, cmd) if err := apply.Err(); err != nil { - fmt.Println("-------------------------------------", err) - if mongo.IsDuplicateKeyError(err) { - return nil - } return err } resp := CmdResponse{} diff --git a/internal/databases/mongo/oplog/applier.go b/internal/databases/mongo/oplog/applier.go index 55c1c7e34..1f78d7cfc 100644 --- a/internal/databases/mongo/oplog/applier.go +++ b/internal/databases/mongo/oplog/applier.go @@ -13,7 +13,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "io" - "strings" ) type TypeAssertionError struct { @@ -83,8 +82,10 @@ type DBApplier struct { } // NewDBApplier builds DBApplier with given args. -func NewDBApplier(m client.MongoDriver, preserveUUID bool, ignoreErrCodes map[string][]int32, node string, filterList shake.OplogFilterChain) *DBApplier { - return &DBApplier{db: m, txnBuffer: txn.NewBuffer(), preserveUUID: preserveUUID, applyIgnoreErrorCodes: ignoreErrCodes, dbNode: node, filterList: filterList} +func NewDBApplier(m client.MongoDriver, preserveUUID bool, ignoreErrCodes map[string][]int32, + node string, filterList shake.OplogFilterChain) *DBApplier { + return &DBApplier{db: m, txnBuffer: txn.NewBuffer(), preserveUUID: preserveUUID, + applyIgnoreErrorCodes: ignoreErrCodes, dbNode: node, filterList: filterList} } func (ap *DBApplier) Apply(ctx context.Context, opr models.Oplog) error { @@ -99,17 +100,12 @@ func (ap *DBApplier) Apply(ctx context.Context, opr models.Oplog) error { return nil } - if strings.HasPrefix(ap.dbNode, "shard") { + if ap.dbNode != "configsvr" { if ap.filterList.IterateFilter(&op) { return nil } } - //if err := ap.shouldSkip(op.Operation, op.Namespace); err != nil { - // tracelog.DebugLogger.Printf("skipping op %+v due to: %+v", op, err) - // return nil - //} - meta, err := txn.NewMeta(op) if err != nil { return fmt.Errorf("can not extract op metadata: %w", err) @@ -142,26 +138,15 @@ func (ap *DBApplier) Close(ctx context.Context) error { return nil } -func (ap *DBApplier) shouldSkip(op, ns string) error { - if op == "n" { - return fmt.Errorf("noop op") - } - - // sharded clusters are not supported yet - if (strings.HasPrefix(ns, "config.") || strings.HasPrefix(ns, "admin.")) && ap.dbNode != "configsvr" { - return fmt.Errorf("config database op") - } - - return nil -} - // shouldIgnore checks if error should be ignored func (ap *DBApplier) shouldIgnore(op string, err error) bool { ce, ok := err.(mongo.CommandError) if !ok { return false } - + if mongo.IsDuplicateKeyError(err) { + return true + } ignoreErrorCodes, ok := ap.applyIgnoreErrorCodes[op] if !ok { return false @@ -273,8 +258,8 @@ func indexSpecFromCommitIndexBuilds(op db.Oplog) (string, []client.IndexDocument if !ok { return "", nil, NewTypeAssertionError("bson.D", fmt.Sprintf("indexes[%d]", i), elemE.Value) } - for i := range elements { - elemE = elements[i] + for j := range elements { + elemE = elements[j] if elemE.Key == "key" { if indexSpecs[i].Key, ok = elemE.Value.(bson.D); !ok { return "", nil, NewTypeAssertionError("bson.D", "key", elemE.Value) diff --git a/internal/databases/mongo/shake/filter.go b/internal/databases/mongo/shake/filter.go index 575ab7f0b..7da4a8c60 100644 --- a/internal/databases/mongo/shake/filter.go +++ b/internal/databases/mongo/shake/filter.go @@ -1,14 +1,13 @@ package shake import ( - "fmt" "github.com/mongodb/mongo-tools-common/db" "github.com/wal-g/tracelog" "reflect" "strings" ) -// OplogFilter: AutologousFilter, NoopFilter, DDLFilter +// OplogFilter: AutologousFilter, NoopFilter type OplogFilter interface { Filter(log *db.Oplog) bool } @@ -28,21 +27,16 @@ type AutologousFilter struct { } func (filter *AutologousFilter) Filter(log *db.Oplog) bool { - // Filter out unnecessary commands if operation, found := ExtraCommandName(log.Object); found { - fmt.Printf("unnecessary commands. operation: %v found: %v\n", operation) if IsNeedFilterCommand(operation) { return true } } - - // for namespace. we filter noop operation and collection name - // that are admin, local, mongoshake, mongoshake_conflict return filter.FilterNs(log.Namespace) } -// namespace should be filtered. +// NsShouldBeIgnore for namespaces should be filtered. // key: ns, value: true means prefix, false means contain var NsShouldBeIgnore = map[string]bool{ "admin.": true, @@ -51,32 +45,33 @@ var NsShouldBeIgnore = map[string]bool{ "system.views": false, } -// namespace should not be filtered. // NsShouldNotBeIgnore has a higher priority than NsShouldBeIgnore // key: ns, value: true means prefix, false means contain var NsShouldNotBeIgnore = map[string]bool{ - "admin.$cmd": true, + "admin.$cmd": true, + "admin.system.users": false, + "admin.system.roles": false, } func (filter *AutologousFilter) FilterNs(namespace string) bool { // for namespace. we filter noop operation and collection name - // that are admin, local, config, mongoshake, mongoshake_conflict + // that are admin, local, config // v2.4.13, don't filter admin.$cmd which may include transaction + // we don't filter admin.system.users and admin.system.roles to retrieve roles and users for key, val := range NsShouldNotBeIgnore { - if val == true && strings.HasPrefix(namespace, key) { + if val && strings.HasPrefix(namespace, key) { return false } - if val == false && strings.Contains(namespace, key) { + if !val && strings.Contains(namespace, key) { return false } } - for key, val := range NsShouldBeIgnore { - if val == true && strings.HasPrefix(namespace, key) { + if val && strings.HasPrefix(namespace, key) { return true } - if val == false && strings.Contains(namespace, key) { + if !val && strings.Contains(namespace, key) { return true } } @@ -90,10 +85,11 @@ func (filter *NoopFilter) Filter(log *db.Oplog) bool { return log.Operation == "n" } -type DDLFilter struct { -} - -func (filter *DDLFilter) Filter(log *db.Oplog) bool { - operation, _ := ExtraCommandName(log.Object) - return log.Operation == "c" && operation != "applyOps" && operation != "create" || strings.HasSuffix(log.Namespace, "system.indexes") -} +//type DDLFilter struct { +//} +// +//func (filter *DDLFilter) Filter(log *db.Oplog) bool { +// //operation, _ := ExtraCommandName(log.Object) +// //return log.Operation == "c" && operation != "applyOps" && operation != "create" || strings.HasSuffix(log.Namespace, "system.indexes") +// return false +//} diff --git a/internal/databases/mongo/stages/applier.go b/internal/databases/mongo/stages/applier.go index 8085be2b4..b1883b5d8 100644 --- a/internal/databases/mongo/stages/applier.go +++ b/internal/databases/mongo/stages/applier.go @@ -34,7 +34,6 @@ func NewGenericApplier(applier oplog.Applier) *GenericApplier { // Apply runs working cycle that applies oplog records. func (dba *GenericApplier) Apply(ctx context.Context, ch chan *models.Oplog) (chan error, error) { - errc := make(chan error) go func() { defer close(errc) diff --git a/internal/databases/mysql/binlog_fetch_handler.go b/internal/databases/mysql/binlog_fetch_handler.go index 3bb9ef462..19baf8c6a 100644 --- a/internal/databases/mysql/binlog_fetch_handler.go +++ b/internal/databases/mysql/binlog_fetch_handler.go @@ -47,11 +47,11 @@ func HandleBinlogFetch(folder storage.Folder, backupName string, untilTS string, tracelog.ErrorLogger.FatalOnError(err) var startTS, endTS, endBinlogTS time.Time if skipStartTime { - startTS, endTS, endBinlogTS, err = getEndTimestamps(folder, untilTS, untilBinlogLastModifiedTS) + startTS, endTS, endBinlogTS, err = getEndTimestamps(untilTS, untilBinlogLastModifiedTS) } else { startTS, endTS, endBinlogTS, err = getTimestamps(folder, backupName, untilTS, untilBinlogLastModifiedTS) - tracelog.ErrorLogger.FatalOnError(err) } + tracelog.ErrorLogger.FatalOnError(err) handler := newIndexHandler(dstDir) diff --git a/internal/databases/mysql/binlog_replay_handler.go b/internal/databases/mysql/binlog_replay_handler.go index 118c5e2fd..c8ea17008 100644 --- a/internal/databases/mysql/binlog_replay_handler.go +++ b/internal/databases/mysql/binlog_replay_handler.go @@ -112,7 +112,7 @@ func getTimestamps(folder storage.Folder, backupName, untilTS, untilBinlogLastMo return startTS, endTS, endBinlogTS, nil } -func getEndTimestamps(folder storage.Folder, untilTS, untilBinlogLastModifiedTS string) (time.Time, time.Time, time.Time, error) { +func getEndTimestamps(untilTS, untilBinlogLastModifiedTS string) (time.Time, time.Time, time.Time, error) { endTS, err := utility.ParseUntilTS(untilTS) if err != nil { return time.Time{}, time.Time{}, time.Time{}, err