sync.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package canal
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. "time"
  6. "github.com/go-mysql-org/go-mysql/mysql"
  7. "github.com/go-mysql-org/go-mysql/replication"
  8. "github.com/go-mysql-org/go-mysql/schema"
  9. "github.com/google/uuid"
  10. "github.com/pingcap/errors"
  11. "github.com/pingcap/parser/ast"
  12. "github.com/siddontang/go-log/log"
  13. )
  14. func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
  15. gset := c.master.GTIDSet()
  16. if gset == nil || gset.String() == "" {
  17. pos := c.master.Position()
  18. s, err := c.syncer.StartSync(pos)
  19. if err != nil {
  20. return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err)
  21. }
  22. log.Infof("start sync binlog at binlog file %v", pos)
  23. return s, nil
  24. } else {
  25. gsetClone := gset.Clone()
  26. s, err := c.syncer.StartSyncGTID(gset)
  27. if err != nil {
  28. return nil, errors.Errorf("start sync replication at GTID set %v error %v", gset, err)
  29. }
  30. log.Infof("start sync binlog at GTID set %v", gsetClone)
  31. return s, nil
  32. }
  33. }
  34. func (c *Canal) runSyncBinlog() error {
  35. s, err := c.startSyncer()
  36. if err != nil {
  37. return err
  38. }
  39. savePos := false
  40. force := false
  41. // The name of the binlog file received in the fake rotate event.
  42. // It must be preserved until the new position is saved.
  43. fakeRotateLogName := ""
  44. for {
  45. ev, err := s.GetEvent(c.ctx)
  46. if err != nil {
  47. return errors.Trace(err)
  48. }
  49. // Update the delay between the Canal and the Master before the handler hooks are called
  50. c.updateReplicationDelay(ev)
  51. // If log pos equals zero then the received event is a fake rotate event and
  52. // contains only a name of the next binlog file
  53. // See https://github.com/mysql/mysql-server/blob/8e797a5d6eb3a87f16498edcb7261a75897babae/sql/rpl_binlog_sender.h#L235
  54. // and https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899
  55. if ev.Header.LogPos == 0 {
  56. switch e := ev.Event.(type) {
  57. case *replication.RotateEvent:
  58. fakeRotateLogName = string(e.NextLogName)
  59. log.Infof("received fake rotate event, next log name is %s", e.NextLogName)
  60. }
  61. continue
  62. }
  63. savePos = false
  64. force = false
  65. pos := c.master.Position()
  66. curPos := pos.Pos
  67. // next binlog pos
  68. pos.Pos = ev.Header.LogPos
  69. // new file name received in the fake rotate event
  70. if fakeRotateLogName != "" {
  71. pos.Name = fakeRotateLogName
  72. }
  73. // We only save position with RotateEvent and XIDEvent.
  74. // For RowsEvent, we can't save the position until meeting XIDEvent
  75. // which tells the whole transaction is over.
  76. // TODO: If we meet any DDL query, we must save too.
  77. switch e := ev.Event.(type) {
  78. case *replication.RotateEvent:
  79. pos.Name = string(e.NextLogName)
  80. pos.Pos = uint32(e.Position)
  81. log.Infof("rotate binlog to %s", pos)
  82. savePos = true
  83. force = true
  84. if err = c.eventHandler.OnRotate(e); err != nil {
  85. return errors.Trace(err)
  86. }
  87. case *replication.RowsEvent:
  88. // we only focus row based event
  89. err = c.handleRowsEvent(ev)
  90. if err != nil {
  91. e := errors.Cause(err)
  92. // if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal
  93. if e != ErrExcludedTable &&
  94. e != schema.ErrTableNotExist &&
  95. e != schema.ErrMissingTableMeta {
  96. log.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
  97. return errors.Trace(err)
  98. }
  99. }
  100. continue
  101. case *replication.XIDEvent:
  102. savePos = true
  103. // try to save the position later
  104. if err := c.eventHandler.OnXID(pos); err != nil {
  105. return errors.Trace(err)
  106. }
  107. if e.GSet != nil {
  108. c.master.UpdateGTIDSet(e.GSet)
  109. }
  110. case *replication.MariadbGTIDEvent:
  111. // try to save the GTID later
  112. gtid, err := mysql.ParseMariadbGTIDSet(e.GTID.String())
  113. if err != nil {
  114. return errors.Trace(err)
  115. }
  116. if err := c.eventHandler.OnGTID(gtid); err != nil {
  117. return errors.Trace(err)
  118. }
  119. case *replication.GTIDEvent:
  120. u, _ := uuid.FromBytes(e.SID)
  121. gtid, err := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d", u.String(), e.GNO))
  122. if err != nil {
  123. return errors.Trace(err)
  124. }
  125. if err := c.eventHandler.OnGTID(gtid); err != nil {
  126. return errors.Trace(err)
  127. }
  128. case *replication.QueryEvent:
  129. stmts, _, err := c.parser.Parse(string(e.Query), "", "")
  130. if err != nil {
  131. log.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
  132. continue
  133. }
  134. for _, stmt := range stmts {
  135. nodes := parseStmt(stmt)
  136. for _, node := range nodes {
  137. if node.db == "" {
  138. node.db = string(e.Schema)
  139. }
  140. if err = c.updateTable(node.db, node.table); err != nil {
  141. return errors.Trace(err)
  142. }
  143. }
  144. if len(nodes) > 0 {
  145. savePos = true
  146. force = true
  147. // Now we only handle Table Changed DDL, maybe we will support more later.
  148. if err = c.eventHandler.OnDDL(pos, e); err != nil {
  149. return errors.Trace(err)
  150. }
  151. }
  152. }
  153. if savePos && e.GSet != nil {
  154. c.master.UpdateGTIDSet(e.GSet)
  155. }
  156. default:
  157. continue
  158. }
  159. if savePos {
  160. c.master.Update(pos)
  161. c.master.UpdateTimestamp(ev.Header.Timestamp)
  162. fakeRotateLogName = ""
  163. if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), force); err != nil {
  164. return errors.Trace(err)
  165. }
  166. }
  167. }
  168. }
  169. type node struct {
  170. db string
  171. table string
  172. }
  173. func parseStmt(stmt ast.StmtNode) (ns []*node) {
  174. switch t := stmt.(type) {
  175. case *ast.RenameTableStmt:
  176. for _, tableInfo := range t.TableToTables {
  177. n := &node{
  178. db: tableInfo.OldTable.Schema.String(),
  179. table: tableInfo.OldTable.Name.String(),
  180. }
  181. ns = append(ns, n)
  182. }
  183. case *ast.AlterTableStmt:
  184. n := &node{
  185. db: t.Table.Schema.String(),
  186. table: t.Table.Name.String(),
  187. }
  188. ns = []*node{n}
  189. case *ast.DropTableStmt:
  190. for _, table := range t.Tables {
  191. n := &node{
  192. db: table.Schema.String(),
  193. table: table.Name.String(),
  194. }
  195. ns = append(ns, n)
  196. }
  197. case *ast.CreateTableStmt:
  198. n := &node{
  199. db: t.Table.Schema.String(),
  200. table: t.Table.Name.String(),
  201. }
  202. ns = []*node{n}
  203. case *ast.TruncateTableStmt:
  204. n := &node{
  205. db: t.Table.Schema.String(),
  206. table: t.Table.Name.String(),
  207. }
  208. ns = []*node{n}
  209. }
  210. return ns
  211. }
  212. func (c *Canal) updateTable(db, table string) (err error) {
  213. c.ClearTableCache([]byte(db), []byte(table))
  214. log.Infof("table structure changed, clear table cache: %s.%s\n", db, table)
  215. if err = c.eventHandler.OnTableChanged(db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist {
  216. return errors.Trace(err)
  217. }
  218. return
  219. }
  220. func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) {
  221. var newDelay uint32
  222. now := uint32(time.Now().Unix())
  223. if now >= ev.Header.Timestamp {
  224. newDelay = now - ev.Header.Timestamp
  225. }
  226. atomic.StoreUint32(c.delay, newDelay)
  227. }
  228. func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
  229. ev := e.Event.(*replication.RowsEvent)
  230. // Caveat: table may be altered at runtime.
  231. schema := string(ev.Table.Schema)
  232. table := string(ev.Table.Table)
  233. t, err := c.GetTable(schema, table)
  234. if err != nil {
  235. return err
  236. }
  237. var action string
  238. switch e.Header.EventType {
  239. case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
  240. action = InsertAction
  241. case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
  242. action = DeleteAction
  243. case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
  244. action = UpdateAction
  245. default:
  246. return errors.Errorf("%s not supported now", e.Header.EventType)
  247. }
  248. events := newRowsEvent(t, action, ev.Rows, e.Header)
  249. return c.eventHandler.OnRow(events)
  250. }
  251. func (c *Canal) FlushBinlog() error {
  252. _, err := c.Execute("FLUSH BINARY LOGS")
  253. return errors.Trace(err)
  254. }
  255. func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
  256. timer := time.NewTimer(timeout)
  257. for {
  258. select {
  259. case <-timer.C:
  260. return errors.Errorf("wait position %v too long > %s", pos, timeout)
  261. default:
  262. err := c.FlushBinlog()
  263. if err != nil {
  264. return errors.Trace(err)
  265. }
  266. curPos := c.master.Position()
  267. if curPos.Compare(pos) >= 0 {
  268. return nil
  269. } else {
  270. log.Debugf("master pos is %v, wait catching %v", curPos, pos)
  271. time.Sleep(100 * time.Millisecond)
  272. }
  273. }
  274. }
  275. }
  276. func (c *Canal) GetMasterPos() (mysql.Position, error) {
  277. rr, err := c.Execute("SHOW MASTER STATUS")
  278. if err != nil {
  279. return mysql.Position{}, errors.Trace(err)
  280. }
  281. name, _ := rr.GetString(0, 0)
  282. pos, _ := rr.GetInt(0, 1)
  283. return mysql.Position{Name: name, Pos: uint32(pos)}, nil
  284. }
  285. func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error) {
  286. query := ""
  287. switch c.cfg.Flavor {
  288. case mysql.MariaDBFlavor:
  289. query = "SELECT @@GLOBAL.gtid_current_pos"
  290. default:
  291. query = "SELECT @@GLOBAL.GTID_EXECUTED"
  292. }
  293. rr, err := c.Execute(query)
  294. if err != nil {
  295. return nil, errors.Trace(err)
  296. }
  297. gx, err := rr.GetString(0, 0)
  298. if err != nil {
  299. return nil, errors.Trace(err)
  300. }
  301. gset, err := mysql.ParseGTIDSet(c.cfg.Flavor, gx)
  302. if err != nil {
  303. return nil, errors.Trace(err)
  304. }
  305. return gset, nil
  306. }
  307. func (c *Canal) CatchMasterPos(timeout time.Duration) error {
  308. pos, err := c.GetMasterPos()
  309. if err != nil {
  310. return errors.Trace(err)
  311. }
  312. return c.WaitUntilPos(pos, timeout)
  313. }