diff --git a/canal/canal.go b/canal/canal.go index 9745f11b3..2b9306738 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -49,7 +49,7 @@ type Canal struct { includeTableRegex []*regexp.Regexp excludeTableRegex []*regexp.Regexp - delay *uint32 + lastEventTimestamp *uint32 ctx context.Context cancel context.CancelFunc @@ -74,7 +74,7 @@ func NewCanal(cfg *Config) (*Canal, error) { } c.master = &masterInfo{} - c.delay = new(uint32) + c.lastEventTimestamp = new(uint32) var err error @@ -134,7 +134,7 @@ func (c *Canal) prepareDumper() error { } if c.dumper == nil { - //no mysqldump, use binlog only + // no mysqldump, use binlog only return nil } @@ -175,7 +175,7 @@ func (c *Canal) prepareDumper() error { } func (c *Canal) GetDelay() uint32 { - return atomic.LoadUint32(c.delay) + return uint32(time.Now().Unix()) - atomic.LoadUint32(c.lastEventTimestamp) } // Run will first try to dump all data from MySQL master `mysqldump`, diff --git a/canal/sync.go b/canal/sync.go index f4d2841b6..2e2cae5ec 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -239,12 +239,7 @@ func (c *Canal) updateTable(db, table string) (err error) { return } func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) { - var newDelay uint32 - now := uint32(time.Now().Unix()) - if now >= ev.Header.Timestamp { - newDelay = now - ev.Header.Timestamp - } - atomic.StoreUint32(c.delay, newDelay) + atomic.StoreUint32(c.lastEventTimestamp, ev.Header.Timestamp) } func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { diff --git a/mysql/result.go b/mysql/result.go index 797a4af75..f3c287743 100644 --- a/mysql/result.go +++ b/mysql/result.go @@ -19,3 +19,17 @@ func (r *Result) Close() { r.Resultset = nil } } + +func (r *Result) ChainResultSet(rs *Resultset) { + if r.Resultset == nil { + r.Resultset = rs + return + } + + var lastRS *Resultset + + for lastRS = r.Resultset; lastRS.Next != nil; lastRS = lastRS.Next { + } + + lastRS.Next = rs +} diff --git a/mysql/resultset.go b/mysql/resultset.go index f244b7d06..42673c976 100644 --- a/mysql/resultset.go +++ b/mysql/resultset.go @@ -17,6 +17,10 @@ type Resultset struct { RawPkg []byte RowDatas []RowData + + // In the case of multiple queries, we will have there a chaining list of separated Resultset + Next *Resultset + filled bool } var ( @@ -266,3 +270,7 @@ func (r *Resultset) GetStringByName(row int, name string) (string, error) { return r.GetString(row, column) } } + +func (r *Resultset) IsFilled() bool { + return r.filled +} diff --git a/mysql/resultset_helper.go b/mysql/resultset_helper.go index 3c22c9c4f..c6ccc9091 100644 --- a/mysql/resultset_helper.go +++ b/mysql/resultset_helper.go @@ -123,6 +123,7 @@ func BuildSimpleTextResultset(names []string, values [][]interface{}) (*Resultse r := new(Resultset) r.Fields = make([]*Field, len(names)) + r.filled = true var b []byte @@ -189,6 +190,7 @@ func BuildSimpleBinaryResultset(names []string, values [][]interface{}) (*Result r := new(Resultset) r.Fields = make([]*Field, len(names)) + r.filled = true var b []byte diff --git a/server/resp.go b/server/resp.go index d8051c698..98f3dad40 100644 --- a/server/resp.go +++ b/server/resp.go @@ -181,7 +181,27 @@ func (c *Conn) writeValue(value interface{}) error { return c.writeOK(nil) case *Result: if v != nil && v.Resultset != nil { - return c.writeResultset(v.Resultset) + for rs := v.Resultset; rs != nil; rs = rs.Next { + var err error + + if rs.Next != nil { + c.status |= SERVER_MORE_RESULTS_EXISTS + } + + if !rs.IsFilled() { + err = c.writeOK(v) + } else { + err = c.writeResultset(rs) + } + + c.status &= ^SERVER_MORE_RESULTS_EXISTS + + if err != nil { + return err + } + } + + return nil } else { return c.writeOK(v) }