Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added query event in dummy event handler #791

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions canal/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestCreateTableExp(t *testing.T) {
t.Fatalf("TestCreateTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestAlterTableExp(t *testing.T) {
t.Fatalf("TestAlterTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestRenameTableExp(t *testing.T) {
t.Fatalf("TestRenameTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestDropTableExp(t *testing.T) {
t.Fatalf("TestDropTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down Expand Up @@ -329,7 +329,7 @@ func TestWithoutSchemeExp(t *testing.T) {
t.Fatalf("TestCreateTableExp:case %s failed\n", s.Query)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down
14 changes: 11 additions & 3 deletions canal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package canal
import (
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/tidb/parser/ast"
)

type EventHandler interface {
Expand All @@ -17,18 +18,22 @@ type EventHandler interface {
OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
// OnQueryEvent is query event include (create user, drop user, create index, etc.)
// Note: the OnQueryEvent has lower priority than OnDDL even
OnQueryEvent(header *replication.EventHeader, stmt ast.StmtNode, pos mysql.Position, e *replication.QueryEvent) (bool, bool, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use named return in this interface, or explain what's the meaning of two bools in comments. So users of this library will understand when they implement this interface.

String() string
}

type DummyEventHandler struct {
}
type DummyEventHandler struct{}

func (h *DummyEventHandler) OnRotate(*replication.EventHeader, *replication.RotateEvent) error {
return nil
}

func (h *DummyEventHandler) OnTableChanged(*replication.EventHeader, string, string) error {
return nil
}

func (h *DummyEventHandler) OnDDL(*replication.EventHeader, mysql.Position, *replication.QueryEvent) error {
return nil
}
Expand All @@ -38,7 +43,10 @@ func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) erro
func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error {
return nil
}

func (h *DummyEventHandler) OnQueryEvent(*replication.EventHeader, ast.StmtNode, mysql.Position, *replication.QueryEvent) (bool, bool, error) {
savePos, force := false, false
return savePos, force, nil
}
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }

// `SetEventHandler` registers the sync handler, you must register your
Expand Down
53 changes: 40 additions & 13 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,18 @@ func (c *Canal) runSyncBinlog() error {
continue
}
for _, stmt := range stmts {
nodes := parseStmt(stmt)
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err = c.updateTable(ev.Header, node.db, node.table); err != nil {
return errors.Trace(err)
}
}
nodes := parseDDLStmt(stmt)
if len(nodes) > 0 {
savePos = true
force = true
// Now we only handle Table Changed DDL, maybe we will support more later.
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
return errors.Trace(err)
err := c.handleDDLEvent(ev, e, nodes, pos)
if err != nil {
c.cfg.Logger.Errorf("handle ddl event err %v", err)
}
} else {
savePos, force, err = c.handleQueryEvent(ev.Header, stmt, pos, e)
if err != nil {
c.cfg.Logger.Errorf("handle query event err %v", err)
}
}
}
Expand All @@ -183,7 +180,7 @@ type node struct {
table string
}

func parseStmt(stmt ast.StmtNode) (ns []*node) {
func parseDDLStmt(stmt ast.StmtNode) (ns []*node) {
switch t := stmt.(type) {
case *ast.RenameTableStmt:
for _, tableInfo := range t.TableToTables {
Expand Down Expand Up @@ -231,6 +228,7 @@ func (c *Canal) updateTable(header *replication.EventHeader, db, table string) (
}
return
}

func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) {
var newDelay uint32
now := uint32(time.Now().Unix())
Expand Down Expand Up @@ -336,3 +334,32 @@ func (c *Canal) CatchMasterPos(timeout time.Duration) error {

return c.WaitUntilPos(pos, timeout)
}

// handleDDLEvent is handle DDL event
func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, nodes []*node, pos mysql.Position) error {
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err := c.updateTable(ev.Header, node.db, node.table); err != nil {
return errors.Trace(err)
}
}
if len(nodes) > 0 {
axfor marked this conversation as resolved.
Show resolved Hide resolved
// Now we only handle Table Changed DDL, maybe we will support more later.
if err := c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
return errors.Trace(err)
}
}
return nil
}

// handleQueryEvent is handle some common query events (e.g., DDL,CREATE or DROP USER,GRANT)
// DDL event use handleDDLEvent, others use the handleQueryEvent
func (c *Canal) handleQueryEvent(header *replication.EventHeader, stmt ast.StmtNode, pos mysql.Position, e *replication.QueryEvent) (bool, bool, error) {
savePos, force, err := c.eventHandler.OnQueryEvent(header, stmt, pos, e)
if err != nil {
return savePos, force, errors.Trace(err)
}
return savePos, force, nil
Comment on lines +361 to +364
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err != nil {
return savePos, force, errors.Trace(err)
}
return savePos, force, nil
return savePos, force, errors.Trace(err)

}
10 changes: 6 additions & 4 deletions client/resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result,
return nil
}

func (c *Conn) readResultColumns(result *Result) (err error) {
func (c *Conn) readResultColumns(result *Result) error {
var i = 0
var data []byte

var err error
for {
rawPkgLen := len(result.RawPkg)
result.RawPkg, err = c.ReadPacketReuseMem(result.RawPkg)
Expand Down Expand Up @@ -378,8 +378,9 @@ func (c *Conn) readResultColumns(result *Result) (err error) {
}
}

func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
func (c *Conn) readResultRows(result *Result, isBinary bool) error {
var data []byte
var err error

for {
rawPkgLen := len(result.RawPkg)
Expand Down Expand Up @@ -425,10 +426,11 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
return nil
}

func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb SelectPerRowCallback) (err error) {
func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb SelectPerRowCallback) error {
var (
data []byte
row []FieldValue
err error
)

for {
Expand Down
3 changes: 2 additions & 1 deletion mysql/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ const (
FieldValueTypeString
)

func (f *Field) Parse(p FieldData) (err error) {
func (f *Field) Parse(p FieldData) error {
f.Data = p

var n int
var err error
pos := 0
//skip catelog, always def
n, err = SkipLengthEncodedString(p)
Expand Down