binlogsyncer.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875
  1. package replication
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "encoding/binary"
  6. "fmt"
  7. "net"
  8. "os"
  9. "sync"
  10. "time"
  11. "github.com/google/uuid"
  12. "github.com/pingcap/errors"
  13. "github.com/siddontang/go-log/log"
  14. "github.com/go-mysql-org/go-mysql/client"
  15. . "github.com/go-mysql-org/go-mysql/mysql"
  16. )
  17. var (
  18. errSyncRunning = errors.New("Sync is running, must Close first")
  19. )
  20. // BinlogSyncerConfig is the configuration for BinlogSyncer.
  21. type BinlogSyncerConfig struct {
  22. // ServerID is the unique ID in cluster.
  23. ServerID uint32
  24. // Flavor is "mysql" or "mariadb", if not set, use "mysql" default.
  25. Flavor string
  26. // Host is for MySQL server host.
  27. Host string
  28. // Port is for MySQL server port.
  29. Port uint16
  30. // User is for MySQL user.
  31. User string
  32. // Password is for MySQL password.
  33. Password string
  34. // Localhost is local hostname if register salve.
  35. // If not set, use os.Hostname() instead.
  36. Localhost string
  37. // Charset is for MySQL client character set
  38. Charset string
  39. // SemiSyncEnabled enables semi-sync or not.
  40. SemiSyncEnabled bool
  41. // RawModeEnabled is for not parsing binlog event.
  42. RawModeEnabled bool
  43. // If not nil, use the provided tls.Config to connect to the database using TLS/SSL.
  44. TLSConfig *tls.Config
  45. // Use replication.Time structure for timestamp and datetime.
  46. // We will use Local location for timestamp and UTC location for datatime.
  47. ParseTime bool
  48. // If ParseTime is false, convert TIMESTAMP into this specified timezone. If
  49. // ParseTime is true, this option will have no effect and TIMESTAMP data will
  50. // be parsed into the local timezone and a full time.Time struct will be
  51. // returned.
  52. //
  53. // Note that MySQL TIMESTAMP columns are offset from the machine local
  54. // timezone while DATETIME columns are offset from UTC. This is consistent
  55. // with documented MySQL behaviour as it return TIMESTAMP in local timezone
  56. // and DATETIME in UTC.
  57. //
  58. // Setting this to UTC effectively equalizes the TIMESTAMP and DATETIME time
  59. // strings obtained from MySQL.
  60. TimestampStringLocation *time.Location
  61. // Use decimal.Decimal structure for decimals.
  62. UseDecimal bool
  63. // RecvBufferSize sets the size in bytes of the operating system's receive buffer associated with the connection.
  64. RecvBufferSize int
  65. // master heartbeat period
  66. HeartbeatPeriod time.Duration
  67. // read timeout
  68. ReadTimeout time.Duration
  69. // maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry.
  70. // this configuration will not work if DisableRetrySync is true
  71. MaxReconnectAttempts int
  72. // whether disable re-sync for broken connection
  73. DisableRetrySync bool
  74. // Only works when MySQL/MariaDB variable binlog_checksum=CRC32.
  75. // For MySQL, binlog_checksum was introduced since 5.6.2, but CRC32 was set as default value since 5.6.6 .
  76. // https://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#option_mysqld_binlog-checksum
  77. // For MariaDB, binlog_checksum was introduced since MariaDB 5.3, but CRC32 was set as default value since MariaDB 10.2.1 .
  78. // https://mariadb.com/kb/en/library/replication-and-binary-log-server-system-variables/#binlog_checksum
  79. VerifyChecksum bool
  80. // DumpCommandFlag is used to send binglog dump command. Default 0, aka BINLOG_DUMP_NEVER_STOP.
  81. // For MySQL, BINLOG_DUMP_NEVER_STOP and BINLOG_DUMP_NON_BLOCK are available.
  82. // https://dev.mysql.com/doc/internals/en/com-binlog-dump.html#binlog-dump-non-block
  83. // For MariaDB, BINLOG_DUMP_NEVER_STOP, BINLOG_DUMP_NON_BLOCK and BINLOG_SEND_ANNOTATE_ROWS_EVENT are available.
  84. // https://mariadb.com/kb/en/library/com_binlog_dump/
  85. // https://mariadb.com/kb/en/library/annotate_rows_event/
  86. DumpCommandFlag uint16
  87. //Option function is used to set outside of BinlogSyncerConfig, between mysql connection and COM_REGISTER_SLAVE
  88. //For MariaDB: slave_gtid_ignore_duplicates、skip_replication、slave_until_gtid
  89. Option func(*client.Conn) error
  90. }
  91. // BinlogSyncer syncs binlog event from server.
  92. type BinlogSyncer struct {
  93. m sync.RWMutex
  94. cfg BinlogSyncerConfig
  95. c *client.Conn
  96. wg sync.WaitGroup
  97. parser *BinlogParser
  98. nextPos Position
  99. prevGset, currGset GTIDSet
  100. running bool
  101. ctx context.Context
  102. cancel context.CancelFunc
  103. lastConnectionID uint32
  104. retryCount int
  105. }
  106. // NewBinlogSyncer creates the BinlogSyncer with cfg.
  107. func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
  108. if cfg.ServerID == 0 {
  109. log.Fatal("can't use 0 as the server ID")
  110. }
  111. // Clear the Password to avoid outputing it in log.
  112. pass := cfg.Password
  113. cfg.Password = ""
  114. log.Infof("create BinlogSyncer with config %v", cfg)
  115. cfg.Password = pass
  116. b := new(BinlogSyncer)
  117. b.cfg = cfg
  118. b.parser = NewBinlogParser()
  119. b.parser.SetFlavor(cfg.Flavor)
  120. b.parser.SetRawMode(b.cfg.RawModeEnabled)
  121. b.parser.SetParseTime(b.cfg.ParseTime)
  122. b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation)
  123. b.parser.SetUseDecimal(b.cfg.UseDecimal)
  124. b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum)
  125. b.running = false
  126. b.ctx, b.cancel = context.WithCancel(context.Background())
  127. return b
  128. }
  129. // Close closes the BinlogSyncer.
  130. func (b *BinlogSyncer) Close() {
  131. b.m.Lock()
  132. defer b.m.Unlock()
  133. b.close()
  134. }
  135. func (b *BinlogSyncer) close() {
  136. if b.isClosed() {
  137. return
  138. }
  139. log.Info("syncer is closing...")
  140. b.running = false
  141. b.cancel()
  142. if b.c != nil {
  143. err := b.c.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
  144. if err != nil {
  145. log.Warnf(`could not set read deadline: %s`, err)
  146. }
  147. }
  148. // kill last connection id
  149. if b.lastConnectionID > 0 {
  150. // Use a new connection to kill the binlog syncer
  151. // because calling KILL from the same connection
  152. // doesn't actually disconnect it.
  153. c, err := b.newConnection()
  154. if err == nil {
  155. b.killConnection(c, b.lastConnectionID)
  156. c.Close()
  157. }
  158. }
  159. b.wg.Wait()
  160. if b.c != nil {
  161. b.c.Close()
  162. }
  163. log.Info("syncer is closed")
  164. }
  165. func (b *BinlogSyncer) isClosed() bool {
  166. select {
  167. case <-b.ctx.Done():
  168. return true
  169. default:
  170. return false
  171. }
  172. }
  173. func (b *BinlogSyncer) registerSlave() error {
  174. if b.c != nil {
  175. b.c.Close()
  176. }
  177. var err error
  178. b.c, err = b.newConnection()
  179. if err != nil {
  180. return errors.Trace(err)
  181. }
  182. if b.cfg.Option != nil {
  183. if err = b.cfg.Option(b.c); err != nil {
  184. return errors.Trace(err)
  185. }
  186. }
  187. if len(b.cfg.Charset) != 0 {
  188. if err = b.c.SetCharset(b.cfg.Charset); err != nil {
  189. return errors.Trace(err)
  190. }
  191. }
  192. //set read timeout
  193. if b.cfg.ReadTimeout > 0 {
  194. _ = b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout))
  195. }
  196. if b.cfg.RecvBufferSize > 0 {
  197. if tcp, ok := b.c.Conn.Conn.(*net.TCPConn); ok {
  198. _ = tcp.SetReadBuffer(b.cfg.RecvBufferSize)
  199. }
  200. }
  201. // kill last connection id
  202. if b.lastConnectionID > 0 {
  203. b.killConnection(b.c, b.lastConnectionID)
  204. }
  205. // save last last connection id for kill
  206. b.lastConnectionID = b.c.GetConnectionID()
  207. //for mysql 5.6+, binlog has a crc32 checksum
  208. //before mysql 5.6, this will not work, don't matter.:-)
  209. if r, err := b.c.Execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'"); err != nil {
  210. return errors.Trace(err)
  211. } else {
  212. s, _ := r.GetString(0, 1)
  213. if s != "" {
  214. // maybe CRC32 or NONE
  215. // mysqlbinlog.cc use NONE, see its below comments:
  216. // Make a notice to the server that this client
  217. // is checksum-aware. It does not need the first fake Rotate
  218. // necessary checksummed.
  219. // That preference is specified below.
  220. if _, err = b.c.Execute(`SET @master_binlog_checksum='NONE'`); err != nil {
  221. return errors.Trace(err)
  222. }
  223. // if _, err = b.c.Execute(`SET @master_binlog_checksum=@@global.binlog_checksum`); err != nil {
  224. // return errors.Trace(err)
  225. // }
  226. }
  227. }
  228. if b.cfg.Flavor == MariaDBFlavor {
  229. // Refer https://github.com/alibaba/canal/wiki/BinlogChange(MariaDB5&10)
  230. // Tell the server that we understand GTIDs by setting our slave capability
  231. // to MARIA_SLAVE_CAPABILITY_GTID = 4 (MariaDB >= 10.0.1).
  232. if _, err := b.c.Execute("SET @mariadb_slave_capability=4"); err != nil {
  233. return errors.Errorf("failed to set @mariadb_slave_capability=4: %v", err)
  234. }
  235. }
  236. if b.cfg.HeartbeatPeriod > 0 {
  237. _, err = b.c.Execute(fmt.Sprintf("SET @master_heartbeat_period=%d;", b.cfg.HeartbeatPeriod))
  238. if err != nil {
  239. log.Errorf("failed to set @master_heartbeat_period=%d, err: %v", b.cfg.HeartbeatPeriod, err)
  240. return errors.Trace(err)
  241. }
  242. }
  243. if err = b.writeRegisterSlaveCommand(); err != nil {
  244. return errors.Trace(err)
  245. }
  246. if _, err = b.c.ReadOKPacket(); err != nil {
  247. return errors.Trace(err)
  248. }
  249. serverUUID, err := uuid.NewUUID()
  250. if err != nil {
  251. log.Errorf("failed to get new uud %v", err)
  252. return errors.Trace(err)
  253. }
  254. if _, err = b.c.Execute(fmt.Sprintf("SET @slave_uuid = '%s', @replica_uuid = '%s'", serverUUID, serverUUID)); err != nil {
  255. log.Errorf("failed to set @slave_uuid = '%s', err: %v", serverUUID, err)
  256. return errors.Trace(err)
  257. }
  258. return nil
  259. }
  260. func (b *BinlogSyncer) enableSemiSync() error {
  261. if !b.cfg.SemiSyncEnabled {
  262. return nil
  263. }
  264. if r, err := b.c.Execute("SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled';"); err != nil {
  265. return errors.Trace(err)
  266. } else {
  267. s, _ := r.GetString(0, 1)
  268. if s != "ON" {
  269. log.Errorf("master does not support semi synchronous replication, use no semi-sync")
  270. b.cfg.SemiSyncEnabled = false
  271. return nil
  272. }
  273. }
  274. _, err := b.c.Execute(`SET @rpl_semi_sync_slave = 1;`)
  275. if err != nil {
  276. return errors.Trace(err)
  277. }
  278. return nil
  279. }
  280. func (b *BinlogSyncer) prepare() error {
  281. if b.isClosed() {
  282. return errors.Trace(ErrSyncClosed)
  283. }
  284. if err := b.registerSlave(); err != nil {
  285. return errors.Trace(err)
  286. }
  287. if err := b.enableSemiSync(); err != nil {
  288. return errors.Trace(err)
  289. }
  290. return nil
  291. }
  292. func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
  293. b.running = true
  294. s := newBinlogStreamer()
  295. b.wg.Add(1)
  296. go b.onStream(s)
  297. return s
  298. }
  299. // GetNextPosition returns the next position of the syncer
  300. func (b *BinlogSyncer) GetNextPosition() Position {
  301. return b.nextPos
  302. }
  303. // StartSync starts syncing from the `pos` position.
  304. func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
  305. log.Infof("begin to sync binlog from position %s", pos)
  306. b.m.Lock()
  307. defer b.m.Unlock()
  308. if b.running {
  309. return nil, errors.Trace(errSyncRunning)
  310. }
  311. if err := b.prepareSyncPos(pos); err != nil {
  312. return nil, errors.Trace(err)
  313. }
  314. return b.startDumpStream(), nil
  315. }
  316. // StartSyncGTID starts syncing from the `gset` GTIDSet.
  317. func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
  318. log.Infof("begin to sync binlog from GTID set %s", gset)
  319. b.prevGset = gset
  320. b.m.Lock()
  321. defer b.m.Unlock()
  322. if b.running {
  323. return nil, errors.Trace(errSyncRunning)
  324. }
  325. // establishing network connection here and will start getting binlog events from "gset + 1", thus until first
  326. // MariadbGTIDEvent/GTIDEvent event is received - we effectively do not have a "current GTID"
  327. b.currGset = nil
  328. if err := b.prepare(); err != nil {
  329. return nil, errors.Trace(err)
  330. }
  331. var err error
  332. switch b.cfg.Flavor {
  333. case MariaDBFlavor:
  334. err = b.writeBinlogDumpMariadbGTIDCommand(gset)
  335. default:
  336. // default use MySQL
  337. err = b.writeBinlogDumpMysqlGTIDCommand(gset)
  338. }
  339. if err != nil {
  340. return nil, err
  341. }
  342. return b.startDumpStream(), nil
  343. }
  344. func (b *BinlogSyncer) writeBinlogDumpCommand(p Position) error {
  345. b.c.ResetSequence()
  346. data := make([]byte, 4+1+4+2+4+len(p.Name))
  347. pos := 4
  348. data[pos] = COM_BINLOG_DUMP
  349. pos++
  350. binary.LittleEndian.PutUint32(data[pos:], p.Pos)
  351. pos += 4
  352. binary.LittleEndian.PutUint16(data[pos:], b.cfg.DumpCommandFlag)
  353. pos += 2
  354. binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID)
  355. pos += 4
  356. copy(data[pos:], p.Name)
  357. return b.c.WritePacket(data)
  358. }
  359. func (b *BinlogSyncer) writeBinlogDumpMysqlGTIDCommand(gset GTIDSet) error {
  360. p := Position{Name: "", Pos: 4}
  361. gtidData := gset.Encode()
  362. b.c.ResetSequence()
  363. data := make([]byte, 4+1+2+4+4+len(p.Name)+8+4+len(gtidData))
  364. pos := 4
  365. data[pos] = COM_BINLOG_DUMP_GTID
  366. pos++
  367. binary.LittleEndian.PutUint16(data[pos:], 0)
  368. pos += 2
  369. binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID)
  370. pos += 4
  371. binary.LittleEndian.PutUint32(data[pos:], uint32(len(p.Name)))
  372. pos += 4
  373. n := copy(data[pos:], p.Name)
  374. pos += n
  375. binary.LittleEndian.PutUint64(data[pos:], uint64(p.Pos))
  376. pos += 8
  377. binary.LittleEndian.PutUint32(data[pos:], uint32(len(gtidData)))
  378. pos += 4
  379. n = copy(data[pos:], gtidData)
  380. pos += n
  381. data = data[0:pos]
  382. return b.c.WritePacket(data)
  383. }
  384. func (b *BinlogSyncer) writeBinlogDumpMariadbGTIDCommand(gset GTIDSet) error {
  385. // Copy from vitess
  386. startPos := gset.String()
  387. // Set the slave_connect_state variable before issuing COM_BINLOG_DUMP to
  388. // provide the start position in GTID form.
  389. query := fmt.Sprintf("SET @slave_connect_state='%s'", startPos)
  390. if _, err := b.c.Execute(query); err != nil {
  391. return errors.Errorf("failed to set @slave_connect_state='%s': %v", startPos, err)
  392. }
  393. // Real slaves set this upon connecting if their gtid_strict_mode option was
  394. // enabled. We always use gtid_strict_mode because we need it to make our
  395. // internal GTID comparisons safe.
  396. if _, err := b.c.Execute("SET @slave_gtid_strict_mode=1"); err != nil {
  397. return errors.Errorf("failed to set @slave_gtid_strict_mode=1: %v", err)
  398. }
  399. // Since we use @slave_connect_state, the file and position here are ignored.
  400. return b.writeBinlogDumpCommand(Position{Name: "", Pos: 0})
  401. }
  402. // localHostname returns the hostname that register slave would register as.
  403. func (b *BinlogSyncer) localHostname() string {
  404. if len(b.cfg.Localhost) == 0 {
  405. h, _ := os.Hostname()
  406. return h
  407. }
  408. return b.cfg.Localhost
  409. }
  410. func (b *BinlogSyncer) writeRegisterSlaveCommand() error {
  411. b.c.ResetSequence()
  412. hostname := b.localHostname()
  413. // This should be the name of slave host not the host we are connecting to.
  414. data := make([]byte, 4+1+4+1+len(hostname)+1+len(b.cfg.User)+1+len(b.cfg.Password)+2+4+4)
  415. pos := 4
  416. data[pos] = COM_REGISTER_SLAVE
  417. pos++
  418. binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID)
  419. pos += 4
  420. // This should be the name of slave hostname not the host we are connecting to.
  421. data[pos] = uint8(len(hostname))
  422. pos++
  423. n := copy(data[pos:], hostname)
  424. pos += n
  425. data[pos] = uint8(len(b.cfg.User))
  426. pos++
  427. n = copy(data[pos:], b.cfg.User)
  428. pos += n
  429. data[pos] = uint8(len(b.cfg.Password))
  430. pos++
  431. n = copy(data[pos:], b.cfg.Password)
  432. pos += n
  433. binary.LittleEndian.PutUint16(data[pos:], b.cfg.Port)
  434. pos += 2
  435. //replication rank, not used
  436. binary.LittleEndian.PutUint32(data[pos:], 0)
  437. pos += 4
  438. // master ID, 0 is OK
  439. binary.LittleEndian.PutUint32(data[pos:], 0)
  440. return b.c.WritePacket(data)
  441. }
  442. func (b *BinlogSyncer) replySemiSyncACK(p Position) error {
  443. b.c.ResetSequence()
  444. data := make([]byte, 4+1+8+len(p.Name))
  445. pos := 4
  446. // semi sync indicator
  447. data[pos] = SemiSyncIndicator
  448. pos++
  449. binary.LittleEndian.PutUint64(data[pos:], uint64(p.Pos))
  450. pos += 8
  451. copy(data[pos:], p.Name)
  452. err := b.c.WritePacket(data)
  453. if err != nil {
  454. return errors.Trace(err)
  455. }
  456. return nil
  457. }
  458. func (b *BinlogSyncer) retrySync() error {
  459. b.m.Lock()
  460. defer b.m.Unlock()
  461. b.parser.Reset()
  462. if b.prevGset != nil {
  463. msg := fmt.Sprintf("begin to re-sync from %s", b.prevGset.String())
  464. if b.currGset != nil {
  465. msg = fmt.Sprintf("%v (last read GTID=%v)", msg, b.currGset)
  466. }
  467. log.Infof(msg)
  468. if err := b.prepareSyncGTID(b.prevGset); err != nil {
  469. return errors.Trace(err)
  470. }
  471. } else {
  472. log.Infof("begin to re-sync from %s", b.nextPos)
  473. if err := b.prepareSyncPos(b.nextPos); err != nil {
  474. return errors.Trace(err)
  475. }
  476. }
  477. return nil
  478. }
  479. func (b *BinlogSyncer) prepareSyncPos(pos Position) error {
  480. // always start from position 4
  481. if pos.Pos < 4 {
  482. pos.Pos = 4
  483. }
  484. if err := b.prepare(); err != nil {
  485. return errors.Trace(err)
  486. }
  487. if err := b.writeBinlogDumpCommand(pos); err != nil {
  488. return errors.Trace(err)
  489. }
  490. return nil
  491. }
  492. func (b *BinlogSyncer) prepareSyncGTID(gset GTIDSet) error {
  493. var err error
  494. // re establishing network connection here and will start getting binlog events from "gset + 1", thus until first
  495. // MariadbGTIDEvent/GTIDEvent event is received - we effectively do not have a "current GTID"
  496. b.currGset = nil
  497. if err = b.prepare(); err != nil {
  498. return errors.Trace(err)
  499. }
  500. switch b.cfg.Flavor {
  501. case MariaDBFlavor:
  502. err = b.writeBinlogDumpMariadbGTIDCommand(gset)
  503. default:
  504. // default use MySQL
  505. err = b.writeBinlogDumpMysqlGTIDCommand(gset)
  506. }
  507. if err != nil {
  508. return err
  509. }
  510. return nil
  511. }
  512. func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
  513. defer func() {
  514. if e := recover(); e != nil {
  515. s.closeWithError(fmt.Errorf("Err: %v\n Stack: %s", e, Pstack()))
  516. }
  517. b.wg.Done()
  518. }()
  519. for {
  520. data, err := b.c.ReadPacket()
  521. select {
  522. case <-b.ctx.Done():
  523. s.close()
  524. return
  525. default:
  526. }
  527. if err != nil {
  528. log.Error(err)
  529. // we meet connection error, should re-connect again with
  530. // last nextPos or nextGTID we got.
  531. if len(b.nextPos.Name) == 0 && b.prevGset == nil {
  532. // we can't get the correct position, close.
  533. s.closeWithError(err)
  534. return
  535. }
  536. if b.cfg.DisableRetrySync {
  537. log.Warn("retry sync is disabled")
  538. s.closeWithError(err)
  539. return
  540. }
  541. for {
  542. select {
  543. case <-b.ctx.Done():
  544. s.close()
  545. return
  546. case <-time.After(time.Second):
  547. b.retryCount++
  548. if err = b.retrySync(); err != nil {
  549. if b.cfg.MaxReconnectAttempts > 0 && b.retryCount >= b.cfg.MaxReconnectAttempts {
  550. log.Errorf("retry sync err: %v, exceeded max retries (%d)", err, b.cfg.MaxReconnectAttempts)
  551. s.closeWithError(err)
  552. return
  553. }
  554. log.Errorf("retry sync err: %v, wait 1s and retry again", err)
  555. continue
  556. }
  557. }
  558. break
  559. }
  560. // we connect the server and begin to re-sync again.
  561. continue
  562. }
  563. //set read timeout
  564. if b.cfg.ReadTimeout > 0 {
  565. _ = b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout))
  566. }
  567. // Reset retry count on successful packet receieve
  568. b.retryCount = 0
  569. switch data[0] {
  570. case OK_HEADER:
  571. if err = b.parseEvent(s, data); err != nil {
  572. s.closeWithError(err)
  573. return
  574. }
  575. case ERR_HEADER:
  576. err = b.c.HandleErrorPacket(data)
  577. s.closeWithError(err)
  578. return
  579. case EOF_HEADER:
  580. // refer to https://dev.mysql.com/doc/internals/en/com-binlog-dump.html#binlog-dump-non-block
  581. // when COM_BINLOG_DUMP command use BINLOG_DUMP_NON_BLOCK flag,
  582. // if there is no more event to send an EOF_Packet instead of blocking the connection
  583. log.Info("receive EOF packet, no more binlog event now.")
  584. continue
  585. default:
  586. log.Errorf("invalid stream header %c", data[0])
  587. continue
  588. }
  589. }
  590. }
  591. func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
  592. //skip OK byte, 0x00
  593. data = data[1:]
  594. needACK := false
  595. if b.cfg.SemiSyncEnabled && (data[0] == SemiSyncIndicator) {
  596. needACK = (data[1] == 0x01)
  597. //skip semi sync header
  598. data = data[2:]
  599. }
  600. e, err := b.parser.Parse(data)
  601. if err != nil {
  602. return errors.Trace(err)
  603. }
  604. if e.Header.LogPos > 0 {
  605. // Some events like FormatDescriptionEvent return 0, ignore.
  606. b.nextPos.Pos = e.Header.LogPos
  607. }
  608. getCurrentGtidSet := func() GTIDSet {
  609. if b.currGset == nil {
  610. return nil
  611. }
  612. return b.currGset.Clone()
  613. }
  614. advanceCurrentGtidSet := func(gtid string) error {
  615. if b.currGset == nil {
  616. b.currGset = b.prevGset.Clone()
  617. }
  618. prev := b.currGset.Clone()
  619. err := b.currGset.Update(gtid)
  620. if err == nil {
  621. // right after reconnect we will see same gtid as we saw before, thus currGset will not get changed
  622. if !b.currGset.Equal(prev) {
  623. b.prevGset = prev
  624. }
  625. }
  626. return err
  627. }
  628. switch event := e.Event.(type) {
  629. case *RotateEvent:
  630. b.nextPos.Name = string(event.NextLogName)
  631. b.nextPos.Pos = uint32(event.Position)
  632. log.Infof("rotate to %s", b.nextPos)
  633. case *GTIDEvent:
  634. if b.prevGset == nil {
  635. break
  636. }
  637. u, _ := uuid.FromBytes(event.SID)
  638. err := advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), event.GNO))
  639. if err != nil {
  640. return errors.Trace(err)
  641. }
  642. case *MariadbGTIDEvent:
  643. if b.prevGset == nil {
  644. break
  645. }
  646. GTID := event.GTID
  647. err := advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
  648. if err != nil {
  649. return errors.Trace(err)
  650. }
  651. case *XIDEvent:
  652. event.GSet = getCurrentGtidSet()
  653. case *QueryEvent:
  654. event.GSet = getCurrentGtidSet()
  655. }
  656. needStop := false
  657. select {
  658. case s.ch <- e:
  659. case <-b.ctx.Done():
  660. needStop = true
  661. }
  662. if needACK {
  663. err := b.replySemiSyncACK(b.nextPos)
  664. if err != nil {
  665. return errors.Trace(err)
  666. }
  667. }
  668. if needStop {
  669. return errors.New("sync is been closing...")
  670. }
  671. return nil
  672. }
  673. // LastConnectionID returns last connectionID.
  674. func (b *BinlogSyncer) LastConnectionID() uint32 {
  675. return b.lastConnectionID
  676. }
  677. func (b *BinlogSyncer) newConnection() (*client.Conn, error) {
  678. var addr string
  679. if b.cfg.Port != 0 {
  680. addr = fmt.Sprintf("%s:%d", b.cfg.Host, b.cfg.Port)
  681. } else {
  682. addr = b.cfg.Host
  683. }
  684. return client.Connect(addr, b.cfg.User, b.cfg.Password, "", func(c *client.Conn) {
  685. c.SetTLSConfig(b.cfg.TLSConfig)
  686. })
  687. }
  688. func (b *BinlogSyncer) killConnection(conn *client.Conn, id uint32) {
  689. cmd := fmt.Sprintf("KILL %d", id)
  690. if _, err := conn.Execute(cmd); err != nil {
  691. log.Errorf("kill connection %d error %v", id, err)
  692. // Unknown thread id
  693. if code := ErrorCode(err.Error()); code != ER_NO_SUCH_THREAD {
  694. log.Error(errors.Trace(err))
  695. }
  696. }
  697. log.Infof("kill last connection id %d", id)
  698. }