canal.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. package canal
  2. import (
  3. "context"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "regexp"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/go-mysql-org/go-mysql/client"
  14. "github.com/go-mysql-org/go-mysql/dump"
  15. "github.com/go-mysql-org/go-mysql/mysql"
  16. "github.com/go-mysql-org/go-mysql/replication"
  17. "github.com/go-mysql-org/go-mysql/schema"
  18. "github.com/pingcap/errors"
  19. "github.com/pingcap/parser"
  20. "github.com/siddontang/go-log/log"
  21. )
  22. // Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc...
  23. // MySQL must open row format for binlog
  24. type Canal struct {
  25. m sync.Mutex
  26. cfg *Config
  27. parser *parser.Parser
  28. master *masterInfo
  29. dumper *dump.Dumper
  30. dumped bool
  31. dumpDoneCh chan struct{}
  32. syncer *replication.BinlogSyncer
  33. eventHandler EventHandler
  34. connLock sync.Mutex
  35. conn *client.Conn
  36. tableLock sync.RWMutex
  37. tables map[string]*schema.Table
  38. errorTablesGetTime map[string]time.Time
  39. tableMatchCache map[string]bool
  40. includeTableRegex []*regexp.Regexp
  41. excludeTableRegex []*regexp.Regexp
  42. delay *uint32
  43. ctx context.Context
  44. cancel context.CancelFunc
  45. }
  46. // canal will retry fetching unknown table's meta after UnknownTableRetryPeriod
  47. var UnknownTableRetryPeriod = time.Second * time.Duration(10)
  48. var ErrExcludedTable = errors.New("excluded table meta")
  49. func NewCanal(cfg *Config) (*Canal, error) {
  50. c := new(Canal)
  51. c.cfg = cfg
  52. c.ctx, c.cancel = context.WithCancel(context.Background())
  53. c.dumpDoneCh = make(chan struct{})
  54. c.eventHandler = &DummyEventHandler{}
  55. c.parser = parser.New()
  56. c.tables = make(map[string]*schema.Table)
  57. if c.cfg.DiscardNoMetaRowEvent {
  58. c.errorTablesGetTime = make(map[string]time.Time)
  59. }
  60. c.master = &masterInfo{}
  61. c.delay = new(uint32)
  62. var err error
  63. if err = c.prepareDumper(); err != nil {
  64. return nil, errors.Trace(err)
  65. }
  66. if err = c.prepareSyncer(); err != nil {
  67. return nil, errors.Trace(err)
  68. }
  69. if err := c.checkBinlogRowFormat(); err != nil {
  70. return nil, errors.Trace(err)
  71. }
  72. // init table filter
  73. if n := len(c.cfg.IncludeTableRegex); n > 0 {
  74. c.includeTableRegex = make([]*regexp.Regexp, n)
  75. for i, val := range c.cfg.IncludeTableRegex {
  76. reg, err := regexp.Compile(val)
  77. if err != nil {
  78. return nil, errors.Trace(err)
  79. }
  80. c.includeTableRegex[i] = reg
  81. }
  82. }
  83. if n := len(c.cfg.ExcludeTableRegex); n > 0 {
  84. c.excludeTableRegex = make([]*regexp.Regexp, n)
  85. for i, val := range c.cfg.ExcludeTableRegex {
  86. reg, err := regexp.Compile(val)
  87. if err != nil {
  88. return nil, errors.Trace(err)
  89. }
  90. c.excludeTableRegex[i] = reg
  91. }
  92. }
  93. if c.includeTableRegex != nil || c.excludeTableRegex != nil {
  94. c.tableMatchCache = make(map[string]bool)
  95. }
  96. return c, nil
  97. }
  98. func (c *Canal) prepareDumper() error {
  99. var err error
  100. dumpPath := c.cfg.Dump.ExecutionPath
  101. if len(dumpPath) == 0 {
  102. // ignore mysqldump, use binlog only
  103. return nil
  104. }
  105. if c.dumper, err = dump.NewDumper(dumpPath,
  106. c.cfg.Addr, c.cfg.User, c.cfg.Password); err != nil {
  107. return errors.Trace(err)
  108. }
  109. if c.dumper == nil {
  110. //no mysqldump, use binlog only
  111. return nil
  112. }
  113. dbs := c.cfg.Dump.Databases
  114. tables := c.cfg.Dump.Tables
  115. tableDB := c.cfg.Dump.TableDB
  116. if len(tables) == 0 {
  117. c.dumper.AddDatabases(dbs...)
  118. } else {
  119. c.dumper.AddTables(tableDB, tables...)
  120. }
  121. charset := c.cfg.Charset
  122. c.dumper.SetCharset(charset)
  123. c.dumper.SetWhere(c.cfg.Dump.Where)
  124. c.dumper.SkipMasterData(c.cfg.Dump.SkipMasterData)
  125. c.dumper.SetMaxAllowedPacket(c.cfg.Dump.MaxAllowedPacketMB)
  126. c.dumper.SetProtocol(c.cfg.Dump.Protocol)
  127. c.dumper.SetExtraOptions(c.cfg.Dump.ExtraOptions)
  128. // Use hex blob for mysqldump
  129. c.dumper.SetHexBlob(true)
  130. for _, ignoreTable := range c.cfg.Dump.IgnoreTables {
  131. if seps := strings.Split(ignoreTable, ","); len(seps) == 2 {
  132. c.dumper.AddIgnoreTables(seps[0], seps[1])
  133. }
  134. }
  135. if c.cfg.Dump.DiscardErr {
  136. c.dumper.SetErrOut(ioutil.Discard)
  137. } else {
  138. c.dumper.SetErrOut(os.Stderr)
  139. }
  140. return nil
  141. }
  142. func (c *Canal) GetDelay() uint32 {
  143. return atomic.LoadUint32(c.delay)
  144. }
  145. // Run will first try to dump all data from MySQL master `mysqldump`,
  146. // then sync from the binlog position in the dump data.
  147. // It will run forever until meeting an error or Canal closed.
  148. func (c *Canal) Run() error {
  149. return c.run()
  150. }
  151. // RunFrom will sync from the binlog position directly, ignore mysqldump.
  152. func (c *Canal) RunFrom(pos mysql.Position) error {
  153. c.master.Update(pos)
  154. return c.Run()
  155. }
  156. func (c *Canal) StartFromGTID(set mysql.GTIDSet) error {
  157. c.master.UpdateGTIDSet(set)
  158. return c.Run()
  159. }
  160. // Dump all data from MySQL master `mysqldump`, ignore sync binlog.
  161. func (c *Canal) Dump() error {
  162. if c.dumped {
  163. return errors.New("the method Dump can't be called twice")
  164. }
  165. c.dumped = true
  166. defer close(c.dumpDoneCh)
  167. return c.dump()
  168. }
  169. func (c *Canal) run() error {
  170. defer func() {
  171. c.cancel()
  172. }()
  173. c.master.UpdateTimestamp(uint32(time.Now().Unix()))
  174. if !c.dumped {
  175. c.dumped = true
  176. err := c.tryDump()
  177. close(c.dumpDoneCh)
  178. if err != nil {
  179. log.Errorf("canal dump mysql err: %v", err)
  180. return errors.Trace(err)
  181. }
  182. }
  183. if err := c.runSyncBinlog(); err != nil {
  184. if errors.Cause(err) != context.Canceled {
  185. log.Errorf("canal start sync binlog err: %v", err)
  186. return errors.Trace(err)
  187. }
  188. }
  189. return nil
  190. }
  191. func (c *Canal) Close() {
  192. log.Infof("closing canal")
  193. c.m.Lock()
  194. defer c.m.Unlock()
  195. c.cancel()
  196. c.syncer.Close()
  197. c.connLock.Lock()
  198. c.conn.Close()
  199. c.conn = nil
  200. c.connLock.Unlock()
  201. _ = c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true)
  202. }
  203. func (c *Canal) WaitDumpDone() <-chan struct{} {
  204. return c.dumpDoneCh
  205. }
  206. func (c *Canal) Ctx() context.Context {
  207. return c.ctx
  208. }
  209. func (c *Canal) checkTableMatch(key string) bool {
  210. // no filter, return true
  211. if c.tableMatchCache == nil {
  212. return true
  213. }
  214. c.tableLock.RLock()
  215. rst, ok := c.tableMatchCache[key]
  216. c.tableLock.RUnlock()
  217. if ok {
  218. // cache hit
  219. return rst
  220. }
  221. matchFlag := false
  222. // check include
  223. if c.includeTableRegex != nil {
  224. for _, reg := range c.includeTableRegex {
  225. if reg.MatchString(key) {
  226. matchFlag = true
  227. break
  228. }
  229. }
  230. }
  231. // check exclude
  232. if matchFlag && c.excludeTableRegex != nil {
  233. for _, reg := range c.excludeTableRegex {
  234. if reg.MatchString(key) {
  235. matchFlag = false
  236. break
  237. }
  238. }
  239. }
  240. c.tableLock.Lock()
  241. c.tableMatchCache[key] = matchFlag
  242. c.tableLock.Unlock()
  243. return matchFlag
  244. }
  245. func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
  246. key := fmt.Sprintf("%s.%s", db, table)
  247. // if table is excluded, return error and skip parsing event or dump
  248. if !c.checkTableMatch(key) {
  249. return nil, ErrExcludedTable
  250. }
  251. c.tableLock.RLock()
  252. t, ok := c.tables[key]
  253. c.tableLock.RUnlock()
  254. if ok {
  255. return t, nil
  256. }
  257. if c.cfg.DiscardNoMetaRowEvent {
  258. c.tableLock.RLock()
  259. lastTime, ok := c.errorTablesGetTime[key]
  260. c.tableLock.RUnlock()
  261. if ok && time.Since(lastTime) < UnknownTableRetryPeriod {
  262. return nil, schema.ErrMissingTableMeta
  263. }
  264. }
  265. t, err := schema.NewTable(c, db, table)
  266. if err != nil {
  267. // check table not exists
  268. if ok, err1 := schema.IsTableExist(c, db, table); err1 == nil && !ok {
  269. return nil, schema.ErrTableNotExist
  270. }
  271. // work around : RDS HAHeartBeat
  272. // ref : https://github.com/alibaba/canal/blob/master/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L385
  273. // issue : https://github.com/alibaba/canal/issues/222
  274. // This is a common error in RDS that canal can't get HAHealthCheckSchema's meta, so we mock a table meta.
  275. // If canal just skip and log error, as RDS HA heartbeat interval is very short, so too many HAHeartBeat errors will be logged.
  276. if key == schema.HAHealthCheckSchema {
  277. // mock ha_health_check meta
  278. ta := &schema.Table{
  279. Schema: db,
  280. Name: table,
  281. Columns: make([]schema.TableColumn, 0, 2),
  282. Indexes: make([]*schema.Index, 0),
  283. }
  284. ta.AddColumn("id", "bigint(20)", "", "")
  285. ta.AddColumn("type", "char(1)", "", "")
  286. c.tableLock.Lock()
  287. c.tables[key] = ta
  288. c.tableLock.Unlock()
  289. return ta, nil
  290. }
  291. // if DiscardNoMetaRowEvent is true, we just log this error
  292. if c.cfg.DiscardNoMetaRowEvent {
  293. c.tableLock.Lock()
  294. c.errorTablesGetTime[key] = time.Now()
  295. c.tableLock.Unlock()
  296. // log error and return ErrMissingTableMeta
  297. log.Errorf("canal get table meta err: %v", errors.Trace(err))
  298. return nil, schema.ErrMissingTableMeta
  299. }
  300. return nil, err
  301. }
  302. c.tableLock.Lock()
  303. c.tables[key] = t
  304. if c.cfg.DiscardNoMetaRowEvent {
  305. // if get table info success, delete this key from errorTablesGetTime
  306. delete(c.errorTablesGetTime, key)
  307. }
  308. c.tableLock.Unlock()
  309. return t, nil
  310. }
  311. // ClearTableCache clear table cache
  312. func (c *Canal) ClearTableCache(db []byte, table []byte) {
  313. key := fmt.Sprintf("%s.%s", db, table)
  314. c.tableLock.Lock()
  315. delete(c.tables, key)
  316. if c.cfg.DiscardNoMetaRowEvent {
  317. delete(c.errorTablesGetTime, key)
  318. }
  319. c.tableLock.Unlock()
  320. }
  321. // CheckBinlogRowImage checks MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
  322. func (c *Canal) CheckBinlogRowImage(image string) error {
  323. // need to check MySQL binlog row image? full, minimal or noblob?
  324. // now only log
  325. if c.cfg.Flavor == mysql.MySQLFlavor {
  326. if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'`); err != nil {
  327. return errors.Trace(err)
  328. } else {
  329. // MySQL has binlog row image from 5.6, so older will return empty
  330. rowImage, _ := res.GetString(0, 1)
  331. if rowImage != "" && !strings.EqualFold(rowImage, image) {
  332. return errors.Errorf("MySQL uses %s binlog row image, but we want %s", rowImage, image)
  333. }
  334. }
  335. }
  336. return nil
  337. }
  338. func (c *Canal) checkBinlogRowFormat() error {
  339. res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE 'binlog_format';`)
  340. if err != nil {
  341. return errors.Trace(err)
  342. } else if f, _ := res.GetString(0, 1); f != "ROW" {
  343. return errors.Errorf("binlog must ROW format, but %s now", f)
  344. }
  345. return nil
  346. }
  347. func (c *Canal) prepareSyncer() error {
  348. cfg := replication.BinlogSyncerConfig{
  349. ServerID: c.cfg.ServerID,
  350. Flavor: c.cfg.Flavor,
  351. User: c.cfg.User,
  352. Password: c.cfg.Password,
  353. Charset: c.cfg.Charset,
  354. HeartbeatPeriod: c.cfg.HeartbeatPeriod,
  355. ReadTimeout: c.cfg.ReadTimeout,
  356. UseDecimal: c.cfg.UseDecimal,
  357. ParseTime: c.cfg.ParseTime,
  358. SemiSyncEnabled: c.cfg.SemiSyncEnabled,
  359. MaxReconnectAttempts: c.cfg.MaxReconnectAttempts,
  360. DisableRetrySync: c.cfg.DisableRetrySync,
  361. TimestampStringLocation: c.cfg.TimestampStringLocation,
  362. TLSConfig: c.cfg.TLSConfig,
  363. }
  364. if strings.Contains(c.cfg.Addr, "/") {
  365. cfg.Host = c.cfg.Addr
  366. } else {
  367. seps := strings.Split(c.cfg.Addr, ":")
  368. if len(seps) != 2 {
  369. return errors.Errorf("invalid mysql addr format %s, must host:port", c.cfg.Addr)
  370. }
  371. port, err := strconv.ParseUint(seps[1], 10, 16)
  372. if err != nil {
  373. return errors.Trace(err)
  374. }
  375. cfg.Host = seps[0]
  376. cfg.Port = uint16(port)
  377. }
  378. c.syncer = replication.NewBinlogSyncer(cfg)
  379. return nil
  380. }
  381. // Execute a SQL
  382. func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error) {
  383. c.connLock.Lock()
  384. defer c.connLock.Unlock()
  385. argF := make([]func(*client.Conn), 0)
  386. if c.cfg.TLSConfig != nil {
  387. argF = append(argF, func(conn *client.Conn) {
  388. conn.SetTLSConfig(c.cfg.TLSConfig)
  389. })
  390. }
  391. retryNum := 3
  392. for i := 0; i < retryNum; i++ {
  393. if c.conn == nil {
  394. c.conn, err = client.Connect(c.cfg.Addr, c.cfg.User, c.cfg.Password, "", argF...)
  395. if err != nil {
  396. return nil, errors.Trace(err)
  397. }
  398. }
  399. rr, err = c.conn.Execute(cmd, args...)
  400. if err != nil && !mysql.ErrorEqual(err, mysql.ErrBadConn) {
  401. return
  402. } else if mysql.ErrorEqual(err, mysql.ErrBadConn) {
  403. c.conn.Close()
  404. c.conn = nil
  405. continue
  406. } else {
  407. return
  408. }
  409. }
  410. return
  411. }
  412. func (c *Canal) SyncedPosition() mysql.Position {
  413. return c.master.Position()
  414. }
  415. func (c *Canal) SyncedTimestamp() uint32 {
  416. return c.master.timestamp
  417. }
  418. func (c *Canal) SyncedGTIDSet() mysql.GTIDSet {
  419. return c.master.GTIDSet()
  420. }