Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport schema feature from dolphinbeat #358

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 38 additions & 81 deletions canal/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ type Canal struct {
syncer *replication.BinlogSyncer

eventHandler EventHandler
observer Observer

connLock sync.Mutex
conn *client.Conn

tableLock sync.RWMutex
tables map[string]*schema.Table
errorTablesGetTime map[string]time.Time
tracker *schema.SchemaTracker

tableLock sync.RWMutex
tableMatchCache map[string]bool

tableMatchCache map[string]bool
includeTableRegex []*regexp.Regexp
excludeTableRegex []*regexp.Regexp

Expand All @@ -55,6 +56,8 @@ var UnknownTableRetryPeriod = time.Second * time.Duration(10)
var ErrExcludedTable = errors.New("excluded table meta")

func NewCanal(cfg *Config) (*Canal, error) {
var err error

c := new(Canal)
c.cfg = cfg

Expand All @@ -63,13 +66,11 @@ func NewCanal(cfg *Config) (*Canal, error) {
c.dumpDoneCh = make(chan struct{})
c.eventHandler = &DummyEventHandler{}

c.tables = make(map[string]*schema.Table)
if c.cfg.DiscardNoMetaRowEvent {
c.errorTablesGetTime = make(map[string]time.Time)
}
c.master = &masterInfo{}

var err error
if err = c.prepareTracker(); err != nil {
return nil, errors.Trace(err)
}

if err = c.prepareDumper(); err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -166,6 +167,24 @@ func (c *Canal) prepareDumper() error {
return nil
}

func (c *Canal) prepareTracker() error {
var err error
trackerCfg := &schema.TrackerConfig{
CharsetServer: "utf8",
Storage: c.cfg.Tracker.Storage,
Dir: c.cfg.Tracker.Dir,
Addr: c.cfg.Tracker.Addr,
User: c.cfg.Tracker.User,
Password: c.cfg.Tracker.Password,
Database: c.cfg.Tracker.Database,
}
c.tracker, err = schema.NewSchemaTracker(trackerCfg)
if err != nil {
return errors.Trace(err)
}
return nil
}

// Run will first try to dump all data from MySQL master `mysqldump`,
// then sync from the binlog position in the dump data.
// It will run forever until meeting an error or Canal closed.
Expand Down Expand Up @@ -285,87 +304,25 @@ func (c *Canal) checkTableMatch(key string) bool {
return matchFlag
}

func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
func (c *Canal) GetTable(db string, table string) (*schema.TableDef, error) {
key := fmt.Sprintf("%s.%s", db, table)
// if table is excluded, return error and skip parsing event or dump
if !c.checkTableMatch(key) {
return nil, ErrExcludedTable
}
c.tableLock.RLock()
t, ok := c.tables[key]
c.tableLock.RUnlock()

if ok {
return t, nil
}

if c.cfg.DiscardNoMetaRowEvent {
c.tableLock.RLock()
lastTime, ok := c.errorTablesGetTime[key]
c.tableLock.RUnlock()
if ok && time.Now().Sub(lastTime) < UnknownTableRetryPeriod {
return nil, schema.ErrMissingTableMeta
}
}

t, err := schema.NewTable(c, db, table)
if err != nil {
// check table not exists
if ok, err1 := schema.IsTableExist(c, db, table); err1 == nil && !ok {
return nil, schema.ErrTableNotExist
}
// work around : RDS HAHeartBeat
// ref : https://github.com/alibaba/canal/blob/master/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L385
// issue : https://github.com/alibaba/canal/issues/222
// This is a common error in RDS that canal can't get HAHealthCheckSchema's meta, so we mock a table meta.
// If canal just skip and log error, as RDS HA heartbeat interval is very short, so too many HAHeartBeat errors will be logged.
if key == schema.HAHealthCheckSchema {
// mock ha_health_check meta
ta := &schema.Table{
Schema: db,
Name: table,
Columns: make([]schema.TableColumn, 0, 2),
Indexes: make([]*schema.Index, 0),
}
ta.AddColumn("id", "bigint(20)", "", "")
ta.AddColumn("type", "char(1)", "", "")
c.tableLock.Lock()
c.tables[key] = ta
c.tableLock.Unlock()
return ta, nil
}
// if DiscardNoMetaRowEvent is true, we just log this error
if c.cfg.DiscardNoMetaRowEvent {
c.tableLock.Lock()
c.errorTablesGetTime[key] = time.Now()
c.tableLock.Unlock()
// log error and return ErrMissingTableMeta
log.Errorf("canal get table meta err: %v", errors.Trace(err))
return nil, schema.ErrMissingTableMeta
}
return nil, err
}
return c.tracker.GetTableDef(db, table)
}

c.tableLock.Lock()
c.tables[key] = t
if c.cfg.DiscardNoMetaRowEvent {
// if get table info success, delete this key from errorTablesGetTime
delete(c.errorTablesGetTime, key)
}
c.tableLock.Unlock()
func (c *Canal) GetDatabases() []string {
return c.tracker.GetDatabases()
}

return t, nil
func (c *Canal) GetTables(db string) ([]string, error) {
return c.tracker.GetTables(db)
}

// ClearTableCache clear table cache
func (c *Canal) ClearTableCache(db []byte, table []byte) {
key := fmt.Sprintf("%s.%s", db, table)
c.tableLock.Lock()
delete(c.tables, key)
if c.cfg.DiscardNoMetaRowEvent {
delete(c.errorTablesGetTime, key)
}
c.tableLock.Unlock()
func (c *Canal) ExecDDL(db string, statement string) error {
return c.tracker.Exec(db, statement)
}

// Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
Expand Down
10 changes: 6 additions & 4 deletions canal/canal_test.go
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
)

var testHost = flag.String("host", "127.0.0.1", "MySQL host")
var testPort = flag.Int("port", 3306, "MySQL port")
var testUser = flag.String("user", "root", "MySQL user")
var testPassword = flag.String("password", "", "MySQL password")

func Test(t *testing.T) {
TestingT(t)
Expand All @@ -27,8 +30,9 @@ var _ = Suite(&canalTestSuite{})

func (s *canalTestSuite) SetUpSuite(c *C) {
cfg := NewDefaultConfig()
cfg.Addr = fmt.Sprintf("%s:3306", *testHost)
cfg.User = "root"
cfg.Addr = fmt.Sprintf("%s:%d", *testHost, *testPort)
cfg.User = *testUser
cfg.Password = *testPassword
cfg.HeartbeatPeriod = 200 * time.Millisecond
cfg.ReadTimeout = 300 * time.Millisecond
cfg.Dump.ExecutionPath = "mysqldump"
Expand Down Expand Up @@ -61,8 +65,6 @@ func (s *canalTestSuite) SetUpSuite(c *C) {
s.execute(c, "DELETE FROM test.canal_test")
s.execute(c, "INSERT INTO test.canal_test (content, name) VALUES (?, ?), (?, ?), (?, ?)", "1", "a", `\0\ndsfasdf`, "b", "", "c")

s.execute(c, "SET GLOBAL binlog_format = 'ROW'")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because my account dosen't have super privilege to set this global variable. And I think it's ok if mysql is configured to use ROW format already, like the travis config.


s.c.SetEventHandler(&testEventHandler{c: c})
go func() {
err = s.c.Run()
Expand Down
27 changes: 24 additions & 3 deletions canal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/schema"
)

type DumpConfig struct {
Expand Down Expand Up @@ -41,6 +42,24 @@ type DumpConfig struct {
Protocol string `toml:"protocol"`
}

type TrackerConfig struct {
// The charset_set_server of source mysql, we need
// this charset to handle ddl statement
CharsetServer string `toml:"charset_server"`

// Storage type to store schema data, may be boltdb or mysql
Storage string `toml:"storage"`

// Boltdb file path to store data
Dir string `toml:"dir"`

// MySQL info to connect
Addr string `toml:"addr"`
User string `toml:"user"`
Password string `toml:"password"`
Database string `toml:"database"`
}

type Config struct {
Addr string `toml:"addr"`
User string `toml:"user"`
Expand All @@ -60,11 +79,10 @@ type Config struct {
IncludeTableRegex []string `toml:"include_table_regex"`
ExcludeTableRegex []string `toml:"exclude_table_regex"`

// discard row event without table meta
DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"`

Dump DumpConfig `toml:"dump"`

Tracker TrackerConfig `toml:"schema_tracker"`

UseDecimal bool `toml:"use_decimal"`
ParseTime bool `toml:"parse_time"`

Expand Down Expand Up @@ -112,5 +130,8 @@ func NewDefaultConfig() *Config {
c.Dump.DiscardErr = true
c.Dump.SkipMasterData = false

c.Tracker.Storage = schema.StorageType_Boltdb
c.Tracker.Dir = "."

return c
}
55 changes: 45 additions & 10 deletions canal/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ func (h *dumpParseHandler) BinLog(name string, pos uint64) error {
return nil
}

func (h *dumpParseHandler) DDL(db string, statement string) error {
if err := h.c.ctx.Err(); err != nil {
return err
}
if err := h.c.tracker.Exec(db, statement); err != nil {
return err
}
return nil
}

func (h *dumpParseHandler) Data(db string, table string, values []string) error {
if err := h.c.ctx.Err(); err != nil {
return err
Expand All @@ -35,9 +45,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error
tableInfo, err := h.c.GetTable(db, table)
if err != nil {
e := errors.Cause(err)
if e == ErrExcludedTable ||
e == schema.ErrTableNotExist ||
e == schema.ErrMissingTableMeta {
if e == ErrExcludedTable {
return nil
}
log.Errorf("get %s.%s information err: %v", db, table, err)
Expand All @@ -52,19 +60,23 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error
} else if v == "_binary ''" {
vs[i] = []byte{}
} else if v[0] != '\'' {
if tableInfo.Columns[i].Type == schema.TYPE_NUMBER {
if tableInfo.Columns[i].InnerType == schema.TypeShort ||
tableInfo.Columns[i].InnerType == schema.TypeInt24 ||
tableInfo.Columns[i].InnerType == schema.TypeLong ||
tableInfo.Columns[i].InnerType == schema.TypeLonglong {
n, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return fmt.Errorf("parse row %v at %d error %v, int expected", values, i, err)
}
vs[i] = n
} else if tableInfo.Columns[i].Type == schema.TYPE_FLOAT {
} else if tableInfo.Columns[i].InnerType == schema.TypeFloat {
f, err := strconv.ParseFloat(v, 64)
if err != nil {
return fmt.Errorf("parse row %v at %d error %v, float expected", values, i, err)
}
vs[i] = f
} else if tableInfo.Columns[i].Type == schema.TYPE_DECIMAL {
} else if tableInfo.Columns[i].InnerType == schema.TypeDecimal ||
tableInfo.Columns[i].InnerType == schema.TypeNewDecimal {
if h.c.cfg.UseDecimal {
d, err := decimal.NewFromString(v)
if err != nil {
Expand Down Expand Up @@ -158,9 +170,6 @@ func (c *Canal) dump() error {

pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)}
c.master.Update(pos)
if err := c.eventHandler.OnPosSynced(pos, true); err != nil {
return errors.Trace(err)
}
var startPos fmt.Stringer = pos
if h.gset != nil {
c.master.UpdateGTIDSet(h.gset)
Expand All @@ -172,6 +181,8 @@ func (c *Canal) dump() error {
}

func (c *Canal) tryDump() error {
var err error

pos := c.master.Position()
gset := c.master.GTIDSet()
if (len(pos.Name) > 0 && pos.Pos > 0) ||
Expand All @@ -186,5 +197,29 @@ func (c *Canal) tryDump() error {
return nil
}

return c.dump()
// Reset schema info
err = c.tracker.Reset()
if err != nil {
return err
}

err = c.dump()
if err != nil {
return err
}

// Tell schema tracker to make a snapshot
pos = c.master.Position()
err = c.tracker.Persist(pos)
if err != nil {
return err
}

// If data and schema in backup file are persisted,
// we tell event handler to save the pos
if err = c.eventHandler.OnPosSynced(pos, true); err != nil {
return errors.Trace(err)
}

return nil
}
30 changes: 30 additions & 0 deletions canal/hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package canal

type Observer struct {
BeforeSchemaChange func(string, string) error
OnSchemaChangeFailed func(string, string, error) (bool, error)
}

// Register a hook that will be called before schema change
func (c *Canal) RegisterBeforeSchemaChangeHook(fn func(string, string) error) {
c.observer.BeforeSchemaChange = fn
}

// Register a hook that will be called on DDL failed
func (c *Canal) RegisterOnSchemaChangeFailedHook(fn func(string, string, error) (bool, error)) {
c.observer.OnSchemaChangeFailed = fn
}

func (c *Canal) runBeforeSchemaChangeHook(db string, statement string) error {
if c.observer.BeforeSchemaChange == nil {
return nil
}
return c.observer.BeforeSchemaChange(db, statement)
}

func (c *Canal) runOnSchemaChangeFailedHook(db string, statement string, err error) (bool, error) {
if c.observer.OnSchemaChangeFailed == nil {
return false, err
}
return c.observer.OnSchemaChangeFailed(db, statement, err)
}
Loading