event.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. package replication
  2. import (
  3. "encoding/binary"
  4. "encoding/hex"
  5. "fmt"
  6. "io"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "unicode"
  11. "github.com/google/uuid"
  12. "github.com/pingcap/errors"
  13. . "github.com/go-mysql-org/go-mysql/mysql"
  14. )
  15. const (
  16. EventHeaderSize = 19
  17. SidLength = 16
  18. LogicalTimestampTypeCode = 2
  19. PartLogicalTimestampLength = 8
  20. BinlogChecksumLength = 4
  21. UndefinedServerVer = 999999 // UNDEFINED_SERVER_VERSION
  22. )
  23. type BinlogEvent struct {
  24. // raw binlog data which contains all data, including binlog header and event body, and including crc32 checksum if exists
  25. RawData []byte
  26. Header *EventHeader
  27. Event Event
  28. }
  29. func (e *BinlogEvent) Dump(w io.Writer) {
  30. e.Header.Dump(w)
  31. e.Event.Dump(w)
  32. }
  33. type Event interface {
  34. //Dump Event, format like python-mysql-replication
  35. Dump(w io.Writer)
  36. Decode(data []byte) error
  37. }
  38. type EventError struct {
  39. Header *EventHeader
  40. //Error message
  41. Err string
  42. //Event data
  43. Data []byte
  44. }
  45. func (e *EventError) Error() string {
  46. return fmt.Sprintf("Header %#v, Data %q, Err: %v", e.Header, e.Data, e.Err)
  47. }
  48. type EventHeader struct {
  49. Timestamp uint32
  50. EventType EventType
  51. ServerID uint32
  52. EventSize uint32
  53. LogPos uint32
  54. Flags uint16
  55. }
  56. func (h *EventHeader) Decode(data []byte) error {
  57. if len(data) < EventHeaderSize {
  58. return errors.Errorf("header size too short %d, must 19", len(data))
  59. }
  60. pos := 0
  61. h.Timestamp = binary.LittleEndian.Uint32(data[pos:])
  62. pos += 4
  63. h.EventType = EventType(data[pos])
  64. pos++
  65. h.ServerID = binary.LittleEndian.Uint32(data[pos:])
  66. pos += 4
  67. h.EventSize = binary.LittleEndian.Uint32(data[pos:])
  68. pos += 4
  69. h.LogPos = binary.LittleEndian.Uint32(data[pos:])
  70. pos += 4
  71. h.Flags = binary.LittleEndian.Uint16(data[pos:])
  72. // pos += 2
  73. if h.EventSize < uint32(EventHeaderSize) {
  74. return errors.Errorf("invalid event size %d, must >= 19", h.EventSize)
  75. }
  76. return nil
  77. }
  78. func (h *EventHeader) Dump(w io.Writer) {
  79. fmt.Fprintf(w, "=== %s ===\n", h.EventType)
  80. fmt.Fprintf(w, "Date: %s\n", time.Unix(int64(h.Timestamp), 0).Format(TimeFormat))
  81. fmt.Fprintf(w, "Log position: %d\n", h.LogPos)
  82. fmt.Fprintf(w, "Event size: %d\n", h.EventSize)
  83. }
  84. var (
  85. checksumVersionSplitMysql []int = []int{5, 6, 1}
  86. checksumVersionProductMysql int = (checksumVersionSplitMysql[0]*256+checksumVersionSplitMysql[1])*256 + checksumVersionSplitMysql[2]
  87. checksumVersionSplitMariaDB []int = []int{5, 3, 0}
  88. checksumVersionProductMariaDB int = (checksumVersionSplitMariaDB[0]*256+checksumVersionSplitMariaDB[1])*256 + checksumVersionSplitMariaDB[2]
  89. )
  90. // server version format X.Y.Zabc, a is not . or number
  91. func splitServerVersion(server string) []int {
  92. seps := strings.Split(server, ".")
  93. if len(seps) < 3 {
  94. return []int{0, 0, 0}
  95. }
  96. x, _ := strconv.Atoi(seps[0])
  97. y, _ := strconv.Atoi(seps[1])
  98. index := 0
  99. for i, c := range seps[2] {
  100. if !unicode.IsNumber(c) {
  101. index = i
  102. break
  103. }
  104. }
  105. z, _ := strconv.Atoi(seps[2][0:index])
  106. return []int{x, y, z}
  107. }
  108. func calcVersionProduct(server string) int {
  109. versionSplit := splitServerVersion(server)
  110. return ((versionSplit[0]*256+versionSplit[1])*256 + versionSplit[2])
  111. }
  112. type FormatDescriptionEvent struct {
  113. Version uint16
  114. //len = 50
  115. ServerVersion []byte
  116. CreateTimestamp uint32
  117. EventHeaderLength uint8
  118. EventTypeHeaderLengths []byte
  119. // 0 is off, 1 is for CRC32, 255 is undefined
  120. ChecksumAlgorithm byte
  121. }
  122. func (e *FormatDescriptionEvent) Decode(data []byte) error {
  123. pos := 0
  124. e.Version = binary.LittleEndian.Uint16(data[pos:])
  125. pos += 2
  126. e.ServerVersion = make([]byte, 50)
  127. copy(e.ServerVersion, data[pos:])
  128. pos += 50
  129. e.CreateTimestamp = binary.LittleEndian.Uint32(data[pos:])
  130. pos += 4
  131. e.EventHeaderLength = data[pos]
  132. pos++
  133. if e.EventHeaderLength != byte(EventHeaderSize) {
  134. return errors.Errorf("invalid event header length %d, must 19", e.EventHeaderLength)
  135. }
  136. server := string(e.ServerVersion)
  137. checksumProduct := checksumVersionProductMysql
  138. if strings.Contains(strings.ToLower(server), "mariadb") {
  139. checksumProduct = checksumVersionProductMariaDB
  140. }
  141. if calcVersionProduct(string(e.ServerVersion)) >= checksumProduct {
  142. // here, the last 5 bytes is 1 byte check sum alg type and 4 byte checksum if exists
  143. e.ChecksumAlgorithm = data[len(data)-5]
  144. e.EventTypeHeaderLengths = data[pos : len(data)-5]
  145. } else {
  146. e.ChecksumAlgorithm = BINLOG_CHECKSUM_ALG_UNDEF
  147. e.EventTypeHeaderLengths = data[pos:]
  148. }
  149. return nil
  150. }
  151. func (e *FormatDescriptionEvent) Dump(w io.Writer) {
  152. fmt.Fprintf(w, "Version: %d\n", e.Version)
  153. fmt.Fprintf(w, "Server version: %s\n", e.ServerVersion)
  154. //fmt.Fprintf(w, "Create date: %s\n", time.Unix(int64(e.CreateTimestamp), 0).Format(TimeFormat))
  155. fmt.Fprintf(w, "Checksum algorithm: %d\n", e.ChecksumAlgorithm)
  156. //fmt.Fprintf(w, "Event header lengths: \n%s", hex.Dump(e.EventTypeHeaderLengths))
  157. fmt.Fprintln(w)
  158. }
  159. type RotateEvent struct {
  160. Position uint64
  161. NextLogName []byte
  162. }
  163. func (e *RotateEvent) Decode(data []byte) error {
  164. e.Position = binary.LittleEndian.Uint64(data[0:])
  165. e.NextLogName = data[8:]
  166. return nil
  167. }
  168. func (e *RotateEvent) Dump(w io.Writer) {
  169. fmt.Fprintf(w, "Position: %d\n", e.Position)
  170. fmt.Fprintf(w, "Next log name: %s\n", e.NextLogName)
  171. fmt.Fprintln(w)
  172. }
  173. type PreviousGTIDsEvent struct {
  174. GTIDSets string
  175. }
  176. func (e *PreviousGTIDsEvent) Decode(data []byte) error {
  177. var previousGTIDSets []string
  178. pos := 0
  179. uuidCount := binary.LittleEndian.Uint16(data[pos : pos+8])
  180. pos += 8
  181. for i := uint16(0); i < uuidCount; i++ {
  182. uuid := e.decodeUuid(data[pos : pos+16])
  183. pos += 16
  184. sliceCount := binary.LittleEndian.Uint16(data[pos : pos+8])
  185. pos += 8
  186. var intervals []string
  187. for i := uint16(0); i < sliceCount; i++ {
  188. start := e.decodeInterval(data[pos : pos+8])
  189. pos += 8
  190. stop := e.decodeInterval(data[pos : pos+8])
  191. pos += 8
  192. interval := ""
  193. if stop == start+1 {
  194. interval = fmt.Sprintf("%d", start)
  195. } else {
  196. interval = fmt.Sprintf("%d-%d", start, stop-1)
  197. }
  198. intervals = append(intervals, interval)
  199. }
  200. previousGTIDSets = append(previousGTIDSets, fmt.Sprintf("%s:%s", uuid, strings.Join(intervals, ":")))
  201. }
  202. e.GTIDSets = strings.Join(previousGTIDSets, ",")
  203. return nil
  204. }
  205. func (e *PreviousGTIDsEvent) Dump(w io.Writer) {
  206. fmt.Fprintf(w, "Previous GTID Event: %s\n", e.GTIDSets)
  207. fmt.Fprintln(w)
  208. }
  209. func (e *PreviousGTIDsEvent) decodeUuid(data []byte) string {
  210. return fmt.Sprintf("%s-%s-%s-%s-%s", hex.EncodeToString(data[0:4]), hex.EncodeToString(data[4:6]),
  211. hex.EncodeToString(data[6:8]), hex.EncodeToString(data[8:10]), hex.EncodeToString(data[10:]))
  212. }
  213. func (e *PreviousGTIDsEvent) decodeInterval(data []byte) uint64 {
  214. return binary.LittleEndian.Uint64(data)
  215. }
  216. type XIDEvent struct {
  217. XID uint64
  218. // in fact XIDEvent dosen't have the GTIDSet information, just for beneficial to use
  219. GSet GTIDSet
  220. }
  221. func (e *XIDEvent) Decode(data []byte) error {
  222. e.XID = binary.LittleEndian.Uint64(data)
  223. return nil
  224. }
  225. func (e *XIDEvent) Dump(w io.Writer) {
  226. fmt.Fprintf(w, "XID: %d\n", e.XID)
  227. if e.GSet != nil {
  228. fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String())
  229. }
  230. fmt.Fprintln(w)
  231. }
  232. type QueryEvent struct {
  233. SlaveProxyID uint32
  234. ExecutionTime uint32
  235. ErrorCode uint16
  236. StatusVars []byte
  237. Schema []byte
  238. Query []byte
  239. // in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use
  240. GSet GTIDSet
  241. }
  242. func (e *QueryEvent) Decode(data []byte) error {
  243. pos := 0
  244. e.SlaveProxyID = binary.LittleEndian.Uint32(data[pos:])
  245. pos += 4
  246. e.ExecutionTime = binary.LittleEndian.Uint32(data[pos:])
  247. pos += 4
  248. schemaLength := data[pos]
  249. pos++
  250. e.ErrorCode = binary.LittleEndian.Uint16(data[pos:])
  251. pos += 2
  252. statusVarsLength := binary.LittleEndian.Uint16(data[pos:])
  253. pos += 2
  254. e.StatusVars = data[pos : pos+int(statusVarsLength)]
  255. pos += int(statusVarsLength)
  256. e.Schema = data[pos : pos+int(schemaLength)]
  257. pos += int(schemaLength)
  258. //skip 0x00
  259. pos++
  260. e.Query = data[pos:]
  261. return nil
  262. }
  263. func (e *QueryEvent) Dump(w io.Writer) {
  264. fmt.Fprintf(w, "Slave proxy ID: %d\n", e.SlaveProxyID)
  265. fmt.Fprintf(w, "Execution time: %d\n", e.ExecutionTime)
  266. fmt.Fprintf(w, "Error code: %d\n", e.ErrorCode)
  267. //fmt.Fprintf(w, "Status vars: \n%s", hex.Dump(e.StatusVars))
  268. fmt.Fprintf(w, "Schema: %s\n", e.Schema)
  269. fmt.Fprintf(w, "Query: %s\n", e.Query)
  270. if e.GSet != nil {
  271. fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String())
  272. }
  273. fmt.Fprintln(w)
  274. }
  275. type GTIDEvent struct {
  276. CommitFlag uint8
  277. SID []byte
  278. GNO int64
  279. LastCommitted int64
  280. SequenceNumber int64
  281. // ImmediateCommitTimestamp/OriginalCommitTimestamp are introduced in MySQL-8.0.1, see:
  282. // https://mysqlhighavailability.com/replication-features-in-mysql-8-0-1/
  283. ImmediateCommitTimestamp uint64
  284. OriginalCommitTimestamp uint64
  285. // Total transaction length (including this GTIDEvent), introduced in MySQL-8.0.2, see:
  286. // https://mysqlhighavailability.com/taking-advantage-of-new-transaction-length-metadata/
  287. TransactionLength uint64
  288. // ImmediateServerVersion/OriginalServerVersion are introduced in MySQL-8.0.14, see
  289. // https://dev.mysql.com/doc/refman/8.0/en/replication-compatibility.html
  290. ImmediateServerVersion uint32
  291. OriginalServerVersion uint32
  292. }
  293. func (e *GTIDEvent) Decode(data []byte) error {
  294. pos := 0
  295. e.CommitFlag = data[pos]
  296. pos++
  297. e.SID = data[pos : pos+SidLength]
  298. pos += SidLength
  299. e.GNO = int64(binary.LittleEndian.Uint64(data[pos:]))
  300. pos += 8
  301. if len(data) >= 42 {
  302. if data[pos] == LogicalTimestampTypeCode {
  303. pos++
  304. e.LastCommitted = int64(binary.LittleEndian.Uint64(data[pos:]))
  305. pos += PartLogicalTimestampLength
  306. e.SequenceNumber = int64(binary.LittleEndian.Uint64(data[pos:]))
  307. pos += 8
  308. // IMMEDIATE_COMMIT_TIMESTAMP_LENGTH = 7
  309. if len(data)-pos < 7 {
  310. return nil
  311. }
  312. e.ImmediateCommitTimestamp = FixedLengthInt(data[pos : pos+7])
  313. pos += 7
  314. if (e.ImmediateCommitTimestamp & (uint64(1) << 55)) != 0 {
  315. // If the most significant bit set, another 7 byte follows representing OriginalCommitTimestamp
  316. e.ImmediateCommitTimestamp &= ^(uint64(1) << 55)
  317. e.OriginalCommitTimestamp = FixedLengthInt(data[pos : pos+7])
  318. pos += 7
  319. } else {
  320. // Otherwise OriginalCommitTimestamp == ImmediateCommitTimestamp
  321. e.OriginalCommitTimestamp = e.ImmediateCommitTimestamp
  322. }
  323. // TRANSACTION_LENGTH_MIN_LENGTH = 1
  324. if len(data)-pos < 1 {
  325. return nil
  326. }
  327. var n int
  328. e.TransactionLength, _, n = LengthEncodedInt(data[pos:])
  329. pos += n
  330. // IMMEDIATE_SERVER_VERSION_LENGTH = 4
  331. e.ImmediateServerVersion = UndefinedServerVer
  332. e.OriginalServerVersion = UndefinedServerVer
  333. if len(data)-pos < 4 {
  334. return nil
  335. }
  336. e.ImmediateServerVersion = binary.LittleEndian.Uint32(data[pos:])
  337. pos += 4
  338. if (e.ImmediateServerVersion & (uint32(1) << 31)) != 0 {
  339. // If the most significant bit set, another 4 byte follows representing OriginalServerVersion
  340. e.ImmediateServerVersion &= ^(uint32(1) << 31)
  341. e.OriginalServerVersion = binary.LittleEndian.Uint32(data[pos:])
  342. // pos += 4
  343. } else {
  344. // Otherwise OriginalServerVersion == ImmediateServerVersion
  345. e.OriginalServerVersion = e.ImmediateServerVersion
  346. }
  347. }
  348. }
  349. return nil
  350. }
  351. func (e *GTIDEvent) Dump(w io.Writer) {
  352. fmtTime := func(t time.Time) string {
  353. if t.IsZero() {
  354. return "<n/a>"
  355. }
  356. return t.Format(time.RFC3339Nano)
  357. }
  358. fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag)
  359. u, _ := uuid.FromBytes(e.SID)
  360. fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO)
  361. fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted)
  362. fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber)
  363. fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime()))
  364. fmt.Fprintf(w, "Orignal commmit timestamp: %d (%s)\n", e.OriginalCommitTimestamp, fmtTime(e.OriginalCommitTime()))
  365. fmt.Fprintf(w, "Transaction length: %d\n", e.TransactionLength)
  366. fmt.Fprintf(w, "Immediate server version: %d\n", e.ImmediateServerVersion)
  367. fmt.Fprintf(w, "Orignal server version: %d\n", e.OriginalServerVersion)
  368. fmt.Fprintln(w)
  369. }
  370. // ImmediateCommitTime returns the commit time of this trx on the immediate server
  371. // or zero time if not available.
  372. func (e *GTIDEvent) ImmediateCommitTime() time.Time {
  373. return microSecTimestampToTime(e.ImmediateCommitTimestamp)
  374. }
  375. // OriginalCommitTime returns the commit time of this trx on the original server
  376. // or zero time if not available.
  377. func (e *GTIDEvent) OriginalCommitTime() time.Time {
  378. return microSecTimestampToTime(e.OriginalCommitTimestamp)
  379. }
  380. type BeginLoadQueryEvent struct {
  381. FileID uint32
  382. BlockData []byte
  383. }
  384. func (e *BeginLoadQueryEvent) Decode(data []byte) error {
  385. pos := 0
  386. e.FileID = binary.LittleEndian.Uint32(data[pos:])
  387. pos += 4
  388. e.BlockData = data[pos:]
  389. return nil
  390. }
  391. func (e *BeginLoadQueryEvent) Dump(w io.Writer) {
  392. fmt.Fprintf(w, "File ID: %d\n", e.FileID)
  393. fmt.Fprintf(w, "Block data: %s\n", e.BlockData)
  394. fmt.Fprintln(w)
  395. }
  396. type ExecuteLoadQueryEvent struct {
  397. SlaveProxyID uint32
  398. ExecutionTime uint32
  399. SchemaLength uint8
  400. ErrorCode uint16
  401. StatusVars uint16
  402. FileID uint32
  403. StartPos uint32
  404. EndPos uint32
  405. DupHandlingFlags uint8
  406. }
  407. func (e *ExecuteLoadQueryEvent) Decode(data []byte) error {
  408. pos := 0
  409. e.SlaveProxyID = binary.LittleEndian.Uint32(data[pos:])
  410. pos += 4
  411. e.ExecutionTime = binary.LittleEndian.Uint32(data[pos:])
  412. pos += 4
  413. e.SchemaLength = data[pos]
  414. pos++
  415. e.ErrorCode = binary.LittleEndian.Uint16(data[pos:])
  416. pos += 2
  417. e.StatusVars = binary.LittleEndian.Uint16(data[pos:])
  418. pos += 2
  419. e.FileID = binary.LittleEndian.Uint32(data[pos:])
  420. pos += 4
  421. e.StartPos = binary.LittleEndian.Uint32(data[pos:])
  422. pos += 4
  423. e.EndPos = binary.LittleEndian.Uint32(data[pos:])
  424. pos += 4
  425. e.DupHandlingFlags = data[pos]
  426. return nil
  427. }
  428. func (e *ExecuteLoadQueryEvent) Dump(w io.Writer) {
  429. fmt.Fprintf(w, "Slave proxy ID: %d\n", e.SlaveProxyID)
  430. fmt.Fprintf(w, "Execution time: %d\n", e.ExecutionTime)
  431. fmt.Fprintf(w, "Schame length: %d\n", e.SchemaLength)
  432. fmt.Fprintf(w, "Error code: %d\n", e.ErrorCode)
  433. fmt.Fprintf(w, "Status vars length: %d\n", e.StatusVars)
  434. fmt.Fprintf(w, "File ID: %d\n", e.FileID)
  435. fmt.Fprintf(w, "Start pos: %d\n", e.StartPos)
  436. fmt.Fprintf(w, "End pos: %d\n", e.EndPos)
  437. fmt.Fprintf(w, "Dup handling flags: %d\n", e.DupHandlingFlags)
  438. fmt.Fprintln(w)
  439. }
  440. // case MARIADB_ANNOTATE_ROWS_EVENT:
  441. // return "MariadbAnnotateRowsEvent"
  442. type MariadbAnnotateRowsEvent struct {
  443. Query []byte
  444. }
  445. func (e *MariadbAnnotateRowsEvent) Decode(data []byte) error {
  446. e.Query = data
  447. return nil
  448. }
  449. func (e *MariadbAnnotateRowsEvent) Dump(w io.Writer) {
  450. fmt.Fprintf(w, "Query: %s\n", e.Query)
  451. fmt.Fprintln(w)
  452. }
  453. type MariadbBinlogCheckPointEvent struct {
  454. Info []byte
  455. }
  456. func (e *MariadbBinlogCheckPointEvent) Decode(data []byte) error {
  457. e.Info = data
  458. return nil
  459. }
  460. func (e *MariadbBinlogCheckPointEvent) Dump(w io.Writer) {
  461. fmt.Fprintf(w, "Info: %s\n", e.Info)
  462. fmt.Fprintln(w)
  463. }
  464. type MariadbGTIDEvent struct {
  465. GTID MariadbGTID
  466. Flags byte
  467. CommitID uint64
  468. }
  469. func (e *MariadbGTIDEvent) IsDDL() bool {
  470. return (e.Flags & BINLOG_MARIADB_FL_DDL) != 0
  471. }
  472. func (e *MariadbGTIDEvent) IsStandalone() bool {
  473. return (e.Flags & BINLOG_MARIADB_FL_STANDALONE) != 0
  474. }
  475. func (e *MariadbGTIDEvent) IsGroupCommit() bool {
  476. return (e.Flags & BINLOG_MARIADB_FL_GROUP_COMMIT_ID) != 0
  477. }
  478. func (e *MariadbGTIDEvent) Decode(data []byte) error {
  479. pos := 0
  480. e.GTID.SequenceNumber = binary.LittleEndian.Uint64(data)
  481. pos += 8
  482. e.GTID.DomainID = binary.LittleEndian.Uint32(data[pos:])
  483. pos += 4
  484. e.Flags = data[pos]
  485. pos += 1
  486. if (e.Flags & BINLOG_MARIADB_FL_GROUP_COMMIT_ID) > 0 {
  487. e.CommitID = binary.LittleEndian.Uint64(data[pos:])
  488. }
  489. return nil
  490. }
  491. func (e *MariadbGTIDEvent) Dump(w io.Writer) {
  492. fmt.Fprintf(w, "GTID: %v\n", e.GTID)
  493. fmt.Fprintf(w, "Flags: %v\n", e.Flags)
  494. fmt.Fprintf(w, "CommitID: %v\n", e.CommitID)
  495. fmt.Fprintln(w)
  496. }
  497. type MariadbGTIDListEvent struct {
  498. GTIDs []MariadbGTID
  499. }
  500. func (e *MariadbGTIDListEvent) Decode(data []byte) error {
  501. pos := 0
  502. v := binary.LittleEndian.Uint32(data[pos:])
  503. pos += 4
  504. count := v & uint32((1<<28)-1)
  505. e.GTIDs = make([]MariadbGTID, count)
  506. for i := uint32(0); i < count; i++ {
  507. e.GTIDs[i].DomainID = binary.LittleEndian.Uint32(data[pos:])
  508. pos += 4
  509. e.GTIDs[i].ServerID = binary.LittleEndian.Uint32(data[pos:])
  510. pos += 4
  511. e.GTIDs[i].SequenceNumber = binary.LittleEndian.Uint64(data[pos:])
  512. pos += 8
  513. }
  514. return nil
  515. }
  516. func (e *MariadbGTIDListEvent) Dump(w io.Writer) {
  517. fmt.Fprintf(w, "Lists: %v\n", e.GTIDs)
  518. fmt.Fprintln(w)
  519. }
  520. type IntVarEvent struct {
  521. Type IntVarEventType
  522. Value uint64
  523. }
  524. func (i *IntVarEvent) Decode(data []byte) error {
  525. i.Type = IntVarEventType(data[0])
  526. i.Value = binary.LittleEndian.Uint64(data[1:])
  527. return nil
  528. }
  529. func (i *IntVarEvent) Dump(w io.Writer) {
  530. fmt.Fprintf(w, "Type: %d\n", i.Type)
  531. fmt.Fprintf(w, "Value: %d\n", i.Value)
  532. }