Skip to content

Commit

Permalink
added query event in dummyEventHandler
Browse files Browse the repository at this point in the history
Signed-off-by: axfor <[email protected]>
  • Loading branch information
axfor committed Jun 13, 2023
1 parent 29bc749 commit 1b71baa
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 17 deletions.
6 changes: 5 additions & 1 deletion canal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type EventHandler interface {
OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
// OnQueryEvent is query event include(create user,drop user,create index event,etd.)
OnQueryEvent(header *replication.EventHeader, queryEvent *replication.QueryEvent) error
String() string
}

Expand All @@ -38,7 +40,9 @@ func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) erro
func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error {
return nil
}

func (h *DummyEventHandler) OnQueryEvent(header *replication.EventHeader, queryEvent *replication.QueryEvent) error {
return nil
}
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }

// `SetEventHandler` registers the sync handler, you must register your
Expand Down
61 changes: 45 additions & 16 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,22 +142,9 @@ func (c *Canal) runSyncBinlog() error {
continue
}
for _, stmt := range stmts {
nodes := parseStmt(stmt)
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err = c.updateTable(ev.Header, node.db, node.table); err != nil {
return errors.Trace(err)
}
}
if len(nodes) > 0 {
savePos = true
force = true
// Now we only handle Table Changed DDL, maybe we will support more later.
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
return errors.Trace(err)
}
err := c.handleQueryEvent(ev, e, stmt, pos, &savePos, &force)
if err != nil {
c.cfg.Logger.Errorf("handle query event(%s) err %v", e.Query, err)
}
}
if savePos && e.GSet != nil {
Expand Down Expand Up @@ -336,3 +323,45 @@ func (c *Canal) CatchMasterPos(timeout time.Duration) error {

return c.WaitUntilPos(pos, timeout)
}

// handleQueryEvent is handle some common query events (e.g., DDL,CREATE or DROP USER,GRANT),
// others use UnknownQueryEvent unified callbacks to expose to users
func (c *Canal) handleQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
stmt ast.StmtNode, pos mysql.Position, savePos, force *bool) error {
switch t := stmt.(type) {
case *ast.RenameTableStmt, *ast.AlterTableStmt, *ast.DropTableStmt, *ast.CreateTableStmt, *ast.TruncateTableStmt:
return c.handleDDLEvent(ev, e, t, pos, savePos, force)
default:
return c.handleUnknownQueryEvent(ev, e, t, pos, savePos, force)
}
}

func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
stmt ast.StmtNode, pos mysql.Position, savePos, force *bool) error {
nodes := parseStmt(stmt)
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err := c.updateTable(ev.Header, node.db, node.table); err != nil {
return errors.Trace(err)
}
}
if len(nodes) > 0 {
*savePos = true
*force = true
// Now we only handle Table Changed DDL, maybe we will support more later.
if err := c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
return errors.Trace(err)
}
}
return nil
}

func (c *Canal) handleUnknownQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
stmt ast.StmtNode, pos mysql.Position, savePos, force *bool) error {
if err := c.eventHandler.OnQueryEvent(ev.Header, e); err != nil {
return errors.Trace(err)
}
return nil
}

0 comments on commit 1b71baa

Please sign in to comment.