dump.go 4.9 KB


  1. package canal
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/go-mysql-org/go-mysql/mysql"
  9. "github.com/go-mysql-org/go-mysql/schema"
  10. "github.com/pingcap/errors"
  11. "github.com/shopspring/decimal"
  12. "github.com/siddontang/go-log/log"
  13. )
  14. type dumpParseHandler struct {
  15. c *Canal
  16. name string
  17. pos uint64
  18. gset mysql.GTIDSet
  19. }
  20. func (h *dumpParseHandler) BinLog(name string, pos uint64) error {
  21. h.name = name
  22. h.pos = pos
  23. return nil
  24. }
  25. func (h *dumpParseHandler) GtidSet(gtidsets string) (err error) {
  26. if h.gset != nil {
  27. err = h.gset.Update(gtidsets)
  28. } else {
  29. h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets)
  30. }
  31. return err
  32. }
  33. func (h *dumpParseHandler) Data(db string, table string, values []string) error {
  34. if err := h.c.ctx.Err(); err != nil {
  35. return err
  36. }
  37. tableInfo, err := h.c.GetTable(db, table)
  38. if err != nil {
  39. e := errors.Cause(err)
  40. if e == ErrExcludedTable ||
  41. e == schema.ErrTableNotExist ||
  42. e == schema.ErrMissingTableMeta {
  43. return nil
  44. }
  45. log.Errorf("get %s.%s information err: %v", db, table, err)
  46. return errors.Trace(err)
  47. }
  48. vs := make([]interface{}, len(values))
  49. for i, v := range values {
  50. if v == "NULL" {
  51. vs[i] = nil
  52. } else if v == "_binary ''" {
  53. vs[i] = []byte{}
  54. } else if v[0] != '\'' {
  55. if tableInfo.Columns[i].Type == schema.TYPE_NUMBER || tableInfo.Columns[i].Type == schema.TYPE_MEDIUM_INT {
  56. var n interface{}
  57. var err error
  58. if tableInfo.Columns[i].IsUnsigned {
  59. n, err = strconv.ParseUint(v, 10, 64)
  60. } else {
  61. n, err = strconv.ParseInt(v, 10, 64)
  62. }
  63. if err != nil {
  64. return fmt.Errorf("parse row %v at %d error %v, int expected", values, i, err)
  65. }
  66. vs[i] = n
  67. } else if tableInfo.Columns[i].Type == schema.TYPE_FLOAT {
  68. f, err := strconv.ParseFloat(v, 64)
  69. if err != nil {
  70. return fmt.Errorf("parse row %v at %d error %v, float expected", values, i, err)
  71. }
  72. vs[i] = f
  73. } else if tableInfo.Columns[i].Type == schema.TYPE_DECIMAL {
  74. if h.c.cfg.UseDecimal {
  75. d, err := decimal.NewFromString(v)
  76. if err != nil {
  77. return fmt.Errorf("parse row %v at %d error %v, decimal expected", values, i, err)
  78. }
  79. vs[i] = d
  80. } else {
  81. f, err := strconv.ParseFloat(v, 64)
  82. if err != nil {
  83. return fmt.Errorf("parse row %v at %d error %v, float expected", values, i, err)
  84. }
  85. vs[i] = f
  86. }
  87. } else if strings.HasPrefix(v, "0x") {
  88. buf, err := hex.DecodeString(v[2:])
  89. if err != nil {
  90. return fmt.Errorf("parse row %v at %d error %v, hex literal expected", values, i, err)
  91. }
  92. vs[i] = string(buf)
  93. } else {
  94. return fmt.Errorf("parse row %v error, invalid type at %d", values, i)
  95. }
  96. } else {
  97. vs[i] = v[1 : len(v)-1]
  98. }
  99. }
  100. events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs}, nil)
  101. return h.c.eventHandler.OnRow(events)
  102. }
  103. func (c *Canal) AddDumpDatabases(dbs ...string) {
  104. if c.dumper == nil {
  105. return
  106. }
  107. c.dumper.AddDatabases(dbs...)
  108. }
  109. func (c *Canal) AddDumpTables(db string, tables ...string) {
  110. if c.dumper == nil {
  111. return
  112. }
  113. c.dumper.AddTables(db, tables...)
  114. }
  115. func (c *Canal) AddDumpIgnoreTables(db string, tables ...string) {
  116. if c.dumper == nil {
  117. return
  118. }
  119. c.dumper.AddIgnoreTables(db, tables...)
  120. }
  121. func (c *Canal) dump() error {
  122. if c.dumper == nil {
  123. return errors.New("mysqldump does not exist")
  124. }
  125. c.master.UpdateTimestamp(uint32(time.Now().Unix()))
  126. h := &dumpParseHandler{c: c}
  127. // If users call StartFromGTID with empty position to start dumping with gtid,
  128. // we record the current gtid position before dump starts.
  129. //
  130. // See tryDump() to see when dump is skipped.
  131. if c.master.GTIDSet() != nil {
  132. gset, err := c.GetMasterGTIDSet()
  133. if err != nil {
  134. return errors.Trace(err)
  135. }
  136. h.gset = gset
  137. }
  138. if c.cfg.Dump.SkipMasterData {
  139. pos, err := c.GetMasterPos()
  140. if err != nil {
  141. return errors.Trace(err)
  142. }
  143. log.Infof("skip master data, get current binlog position %v", pos)
  144. h.name = pos.Name
  145. h.pos = uint64(pos.Pos)
  146. }
  147. start := time.Now()
  148. log.Info("try dump MySQL and parse")
  149. if err := c.dumper.DumpAndParse(h); err != nil {
  150. return errors.Trace(err)
  151. }
  152. pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)}
  153. c.master.Update(pos)
  154. c.master.UpdateGTIDSet(h.gset)
  155. if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true); err != nil {
  156. return errors.Trace(err)
  157. }
  158. var startPos fmt.Stringer = pos
  159. if h.gset != nil {
  160. c.master.UpdateGTIDSet(h.gset)
  161. startPos = h.gset
  162. }
  163. log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s",
  164. time.Since(start).Seconds(), startPos)
  165. return nil
  166. }
  167. func (c *Canal) tryDump() error {
  168. pos := c.master.Position()
  169. gset := c.master.GTIDSet()
  170. if (len(pos.Name) > 0 && pos.Pos > 0) ||
  171. (gset != nil && gset.String() != "") {
  172. // we will sync with binlog name and position
  173. log.Infof("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset)
  174. return nil
  175. }
  176. if c.dumper == nil {
  177. log.Info("skip dump, no mysqldump")
  178. return nil
  179. }
  180. return c.dump()
  181. }