Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support download flushed binlog and parse event for cloud comput… #741

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions canal/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Canal struct {

ctx context.Context
cancel context.CancelFunc

binFileDownloader BinlogFileDownloader
}

// canal will retry fetching unknown table's meta after UnknownTableRetryPeriod
Expand Down
117 changes: 117 additions & 0 deletions canal/local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package canal

import (
"context"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/errors"
)

// BinlogFileDownloader downloads the binlog file and return the path to it. It's often used to download binlog backup from RDS service.
type BinlogFileDownloader func(mysql.Position) (localBinFilePath string, err error)

// WithLocalBinlogDownloader registers the local bin file downloader,
// that allows download the backup binlog file from RDS service to local
func (c *Canal) WithLocalBinlogDownloader(d BinlogFileDownloader) {
c.binFileDownloader = d
}

func (c *Canal) adaptLocalBinFileStreamer(remoteBinlogStreamer *replication.BinlogStreamer, err error) (*localBinFileAdapterStreamer, error) {
return &localBinFileAdapterStreamer{
BinlogStreamer: remoteBinlogStreamer,
syncMasterStreamer: remoteBinlogStreamer,
canal: c,
binFileDownloader: c.binFileDownloader,
}, err
}

// localBinFileAdapterStreamer will support to download flushed binlog file for continuous sync in cloud computing platform
type localBinFileAdapterStreamer struct {
*replication.BinlogStreamer // the running streamer, it will be localStreamer or sync master streamer
syncMasterStreamer *replication.BinlogStreamer // syncMasterStreamer is the streamer from canal startSyncer
canal *Canal
binFileDownloader BinlogFileDownloader
}

// GetEvent will auto switch the local and remote streamer to get binlog event if possible.
func (s *localBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) {
if s.binFileDownloader == nil { // not support to use local bin file
return s.BinlogStreamer.GetEvent(ctx)
}

ev, err := s.BinlogStreamer.GetEvent(ctx)

if err == nil {
switch ev.Event.(type) {
case *replication.RotateEvent: // RotateEvent means need to change steamer back to sync master to retry sync
s.BinlogStreamer = s.syncMasterStreamer
}
return ev, err
}

if err == replication.ErrNeedSyncAgain { // restart master if last sync master syncer has error
s.canal.syncer.Close()
_ = s.canal.prepareSyncer()

newStreamer, startErr := s.canal.startSyncer()
if startErr != nil {
return nil, startErr
}
// set all streamer to the new sync master streamer
s.BinlogStreamer = newStreamer
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
s.syncMasterStreamer = newStreamer

ev, err = newStreamer.GetEvent(ctx)
}

mysqlErr, ok := err.(*mysql.MyError)
// only 'Could not find first log' can create local streamer, ignore other errors
if !ok || mysqlErr.Code != mysql.ER_MASTER_FATAL_ERROR_READING_BINLOG ||
mysqlErr.Message != "Could not find first log file name in binary log index file" {
return ev, err
}

s.canal.cfg.Logger.Info("Could not find first log, try to download the local binlog for retry")

// local binlog need next position to find binlog file and begin event
pos := s.canal.master.Position()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we don't check it's a GTID-based replication? For GTID-based replication, filename and position is not dependable to reset replication because upstream master may failover to a new MySQL node and they have same binlog filename but different position.

newStreamer := s.newLocalBinFileStreamer(s.binFileDownloader, pos)
s.BinlogStreamer = newStreamer

return newStreamer.GetEvent(ctx)
}

func (s *localBinFileAdapterStreamer) newLocalBinFileStreamer(download BinlogFileDownloader, position mysql.Position) *replication.BinlogStreamer {
streamer := replication.NewBinlogStreamer()
binFilePath, err := download(position)
if err != nil {
streamer.CloseWithError(errors.New("local binlog file not exist"))
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
return streamer
}

go func(binFilePath string, streamer *replication.BinlogStreamer) {
beginFromHere := false
err := s.canal.syncer.GetBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error {
if position.Pos < 4 { // binlog first pos is 4, if pos < 4 means canal gives error position info
return nil
}
if be.Header.LogPos == position.Pos || position.Pos == 4 { // go ahead to check if begin
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
beginFromHere = true
}
if beginFromHere {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add an error that no matching position for position.Pos? For example user downlaods wrong binlog file

if err := s.canal.syncer.StorePosAndGTID(be); err != nil {
streamer.CloseWithError(err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can return err to let caller streamer.CloseWithError(err) in line 112

return nil
}
streamer.PutEvent(be)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
})
if err != nil {
streamer.CloseWithError(err)
}
}(binFilePath, streamer)

return streamer
}
2 changes: 1 addition & 1 deletion canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
}

func (c *Canal) runSyncBinlog() error {
s, err := c.startSyncer()
s, err := c.adaptLocalBinFileStreamer(c.startSyncer())
if err != nil {
return err
}
Expand Down
13 changes: 13 additions & 0 deletions replication/binlogstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,16 @@ func newBinlogStreamer() *BinlogStreamer {

return s
}

// PutEvent puts event to BinlogStreamer
func (s *BinlogStreamer) PutEvent(ev *BinlogEvent) {
s.ch <- ev
}

func (s *BinlogStreamer) CloseWithError(err error) {
s.closeWithError(err)
}

func NewBinlogStreamer() *BinlogStreamer {
return newBinlogStreamer()
}
51 changes: 31 additions & 20 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,32 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
return errors.Trace(err)
}

if err := b.StorePosAndGTID(e); err != nil {
return errors.Trace(err)
}

needStop := false
select {
case s.ch <- e:
case <-b.ctx.Done():
needStop = true
}

if needACK {
err := b.replySemiSyncACK(b.nextPos)
if err != nil {
return errors.Trace(err)
}
}

if needStop {
return errors.New("sync is been closing...")
}

return nil
}

func (b *BinlogSyncer) StorePosAndGTID(e *BinlogEvent) error {
if e.Header.LogPos > 0 {
// Some events like FormatDescriptionEvent return 0, ignore.
b.nextPos.Pos = e.Header.LogPos
Expand Down Expand Up @@ -830,7 +856,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
break
}
prev := b.currGset.Clone()
err = b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID)
err := b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -847,25 +873,6 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
event.GSet = getCurrentGtidSet()
}
}

needStop := false
select {
case s.ch <- e:
case <-b.ctx.Done():
needStop = true
}

if needACK {
err := b.replySemiSyncACK(b.nextPos)
if err != nil {
return errors.Trace(err)
}
}

if needStop {
return errors.New("sync is been closing...")
}

return nil
}

Expand Down Expand Up @@ -902,3 +909,7 @@ func (b *BinlogSyncer) killConnection(conn *client.Conn, id uint32) {
}
b.cfg.Logger.Infof("kill last connection id %d", id)
}

func (b *BinlogSyncer) GetBinlogParser() *BinlogParser {
return b.parser
}