parser.go 10 KB


  1. package replication
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "hash/crc32"
  7. "io"
  8. "os"
  9. "sync/atomic"
  10. "time"
  11. "github.com/pingcap/errors"
  12. "github.com/go-mysql-org/go-mysql/utils"
  13. )
  14. var (
  15. // ErrChecksumMismatch indicates binlog checksum mismatch.
  16. ErrChecksumMismatch = errors.New("binlog checksum mismatch, data may be corrupted")
  17. )
  18. type BinlogParser struct {
  19. // "mysql" or "mariadb", if not set, use "mysql" by default
  20. flavor string
  21. format *FormatDescriptionEvent
  22. tables map[uint64]*TableMapEvent
  23. // for rawMode, we only parse FormatDescriptionEvent and RotateEvent
  24. rawMode bool
  25. parseTime bool
  26. timestampStringLocation *time.Location
  27. // used to start/stop processing
  28. stopProcessing uint32
  29. useDecimal bool
  30. ignoreJSONDecodeErr bool
  31. verifyChecksum bool
  32. }
  33. func NewBinlogParser() *BinlogParser {
  34. p := new(BinlogParser)
  35. p.tables = make(map[uint64]*TableMapEvent)
  36. return p
  37. }
  38. func (p *BinlogParser) Stop() {
  39. atomic.StoreUint32(&p.stopProcessing, 1)
  40. }
  41. func (p *BinlogParser) Resume() {
  42. atomic.StoreUint32(&p.stopProcessing, 0)
  43. }
  44. func (p *BinlogParser) Reset() {
  45. p.format = nil
  46. }
  47. type OnEventFunc func(*BinlogEvent) error
  48. func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) error {
  49. f, err := os.Open(name)
  50. if err != nil {
  51. return errors.Trace(err)
  52. }
  53. defer f.Close()
  54. b := make([]byte, 4)
  55. if _, err = f.Read(b); err != nil {
  56. return errors.Trace(err)
  57. } else if !bytes.Equal(b, BinLogFileHeader) {
  58. return errors.Errorf("%s is not a valid binlog file, head 4 bytes must fe'bin' ", name)
  59. }
  60. if offset < 4 {
  61. offset = 4
  62. } else if offset > 4 {
  63. // FORMAT_DESCRIPTION event should be read by default always (despite that fact passed offset may be higher than 4)
  64. if _, err = f.Seek(4, io.SeekStart); err != nil {
  65. return errors.Errorf("seek %s to %d error %v", name, offset, err)
  66. }
  67. if err = p.parseFormatDescriptionEvent(f, onEvent); err != nil {
  68. return errors.Annotatef(err, "parse FormatDescriptionEvent")
  69. }
  70. }
  71. if _, err = f.Seek(offset, io.SeekStart); err != nil {
  72. return errors.Errorf("seek %s to %d error %v", name, offset, err)
  73. }
  74. return p.ParseReader(f, onEvent)
  75. }
  76. func (p *BinlogParser) parseFormatDescriptionEvent(r io.Reader, onEvent OnEventFunc) error {
  77. _, err := p.parseSingleEvent(r, onEvent)
  78. return err
  79. }
  80. // ParseSingleEvent parses single binlog event and passes the event to onEvent function.
  81. func (p *BinlogParser) ParseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, error) {
  82. return p.parseSingleEvent(r, onEvent)
  83. }
  84. func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, error) {
  85. var err error
  86. var n int64
  87. // Here we use `sync.Pool` to avoid allocate/destroy buffers frequently.
  88. buf := utils.BytesBufferGet()
  89. defer utils.BytesBufferPut(buf)
  90. if n, err = io.CopyN(buf, r, EventHeaderSize); err == io.EOF {
  91. return true, nil
  92. } else if err != nil {
  93. return false, errors.Errorf("get event header err %v, need %d but got %d", err, EventHeaderSize, n)
  94. }
  95. var h *EventHeader
  96. h, err = p.parseHeader(buf.Bytes())
  97. if err != nil {
  98. return false, errors.Trace(err)
  99. }
  100. if h.EventSize < uint32(EventHeaderSize) {
  101. return false, errors.Errorf("invalid event header, event size is %d, too small", h.EventSize)
  102. }
  103. if n, err = io.CopyN(buf, r, int64(h.EventSize-EventHeaderSize)); err != nil {
  104. return false, errors.Errorf("get event err %v, need %d but got %d", err, h.EventSize, n)
  105. }
  106. if buf.Len() != int(h.EventSize) {
  107. return false, errors.Errorf("invalid raw data size in event %s, need %d but got %d", h.EventType, h.EventSize, buf.Len())
  108. }
  109. var rawData []byte
  110. rawData = append(rawData, buf.Bytes()...)
  111. bodyLen := int(h.EventSize) - EventHeaderSize
  112. body := rawData[EventHeaderSize:]
  113. if len(body) != bodyLen {
  114. return false, errors.Errorf("invalid body data size in event %s, need %d but got %d", h.EventType, bodyLen, len(body))
  115. }
  116. var e Event
  117. e, err = p.parseEvent(h, body, rawData)
  118. if err != nil {
  119. if err == errMissingTableMapEvent {
  120. return false, nil
  121. }
  122. return false, errors.Trace(err)
  123. }
  124. if err = onEvent(&BinlogEvent{RawData: rawData, Header: h, Event: e}); err != nil {
  125. return false, errors.Trace(err)
  126. }
  127. return false, nil
  128. }
  129. func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error {
  130. for {
  131. if atomic.LoadUint32(&p.stopProcessing) == 1 {
  132. break
  133. }
  134. done, err := p.parseSingleEvent(r, onEvent)
  135. if err != nil {
  136. if err == errMissingTableMapEvent {
  137. continue
  138. }
  139. return errors.Trace(err)
  140. }
  141. if done {
  142. break
  143. }
  144. }
  145. return nil
  146. }
  147. func (p *BinlogParser) SetRawMode(mode bool) {
  148. p.rawMode = mode
  149. }
  150. func (p *BinlogParser) SetParseTime(parseTime bool) {
  151. p.parseTime = parseTime
  152. }
  153. func (p *BinlogParser) SetTimestampStringLocation(timestampStringLocation *time.Location) {
  154. p.timestampStringLocation = timestampStringLocation
  155. }
  156. func (p *BinlogParser) SetUseDecimal(useDecimal bool) {
  157. p.useDecimal = useDecimal
  158. }
  159. func (p *BinlogParser) SetIgnoreJSONDecodeError(ignoreJSONDecodeErr bool) {
  160. p.ignoreJSONDecodeErr = ignoreJSONDecodeErr
  161. }
  162. func (p *BinlogParser) SetVerifyChecksum(verify bool) {
  163. p.verifyChecksum = verify
  164. }
  165. func (p *BinlogParser) SetFlavor(flavor string) {
  166. p.flavor = flavor
  167. }
  168. func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) {
  169. h := new(EventHeader)
  170. err := h.Decode(data)
  171. if err != nil {
  172. return nil, err
  173. }
  174. return h, nil
  175. }
  176. func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (Event, error) {
  177. var e Event
  178. if h.EventType == FORMAT_DESCRIPTION_EVENT {
  179. p.format = &FormatDescriptionEvent{}
  180. e = p.format
  181. } else {
  182. if p.format != nil && p.format.ChecksumAlgorithm == BINLOG_CHECKSUM_ALG_CRC32 {
  183. err := p.verifyCrc32Checksum(rawData)
  184. if err != nil {
  185. return nil, err
  186. }
  187. data = data[0 : len(data)-BinlogChecksumLength]
  188. }
  189. if h.EventType == ROTATE_EVENT {
  190. e = &RotateEvent{}
  191. } else if !p.rawMode {
  192. switch h.EventType {
  193. case QUERY_EVENT:
  194. e = &QueryEvent{}
  195. case XID_EVENT:
  196. e = &XIDEvent{}
  197. case TABLE_MAP_EVENT:
  198. te := &TableMapEvent{
  199. flavor: p.flavor,
  200. }
  201. if p.format.EventTypeHeaderLengths[TABLE_MAP_EVENT-1] == 6 {
  202. te.tableIDSize = 4
  203. } else {
  204. te.tableIDSize = 6
  205. }
  206. e = te
  207. case WRITE_ROWS_EVENTv0,
  208. UPDATE_ROWS_EVENTv0,
  209. DELETE_ROWS_EVENTv0,
  210. WRITE_ROWS_EVENTv1,
  211. DELETE_ROWS_EVENTv1,
  212. UPDATE_ROWS_EVENTv1,
  213. WRITE_ROWS_EVENTv2,
  214. UPDATE_ROWS_EVENTv2,
  215. DELETE_ROWS_EVENTv2:
  216. e = p.newRowsEvent(h)
  217. case ROWS_QUERY_EVENT:
  218. e = &RowsQueryEvent{}
  219. case GTID_EVENT:
  220. e = &GTIDEvent{}
  221. case ANONYMOUS_GTID_EVENT:
  222. e = &GTIDEvent{}
  223. case BEGIN_LOAD_QUERY_EVENT:
  224. e = &BeginLoadQueryEvent{}
  225. case EXECUTE_LOAD_QUERY_EVENT:
  226. e = &ExecuteLoadQueryEvent{}
  227. case MARIADB_ANNOTATE_ROWS_EVENT:
  228. e = &MariadbAnnotateRowsEvent{}
  229. case MARIADB_BINLOG_CHECKPOINT_EVENT:
  230. e = &MariadbBinlogCheckPointEvent{}
  231. case MARIADB_GTID_LIST_EVENT:
  232. e = &MariadbGTIDListEvent{}
  233. case MARIADB_GTID_EVENT:
  234. ee := &MariadbGTIDEvent{}
  235. ee.GTID.ServerID = h.ServerID
  236. e = ee
  237. case PREVIOUS_GTIDS_EVENT:
  238. e = &PreviousGTIDsEvent{}
  239. case INTVAR_EVENT:
  240. e = &IntVarEvent{}
  241. default:
  242. e = &GenericEvent{}
  243. }
  244. } else {
  245. e = &GenericEvent{}
  246. }
  247. }
  248. if err := e.Decode(data); err != nil {
  249. return nil, &EventError{h, err.Error(), data}
  250. }
  251. if te, ok := e.(*TableMapEvent); ok {
  252. p.tables[te.TableID] = te
  253. }
  254. if re, ok := e.(*RowsEvent); ok {
  255. if (re.Flags & RowsEventStmtEndFlag) > 0 {
  256. // Refer https://github.com/alibaba/canal/blob/38cc81b7dab29b51371096fb6763ca3a8432ffee/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java#L176
  257. p.tables = make(map[uint64]*TableMapEvent)
  258. }
  259. }
  260. return e, nil
  261. }
  262. // Parse: Given the bytes for a a binary log event: return the decoded event.
  263. // With the exception of the FORMAT_DESCRIPTION_EVENT event type
  264. // there must have previously been passed a FORMAT_DESCRIPTION_EVENT
  265. // into the parser for this to work properly on any given event.
  266. // Passing a new FORMAT_DESCRIPTION_EVENT into the parser will replace
  267. // an existing one.
  268. func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) {
  269. rawData := data
  270. h, err := p.parseHeader(data)
  271. if err != nil {
  272. return nil, err
  273. }
  274. data = data[EventHeaderSize:]
  275. eventLen := int(h.EventSize) - EventHeaderSize
  276. if len(data) != eventLen {
  277. return nil, fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen)
  278. }
  279. e, err := p.parseEvent(h, data, rawData)
  280. if err != nil {
  281. return nil, err
  282. }
  283. return &BinlogEvent{RawData: rawData, Header: h, Event: e}, nil
  284. }
  285. func (p *BinlogParser) verifyCrc32Checksum(rawData []byte) error {
  286. if !p.verifyChecksum {
  287. return nil
  288. }
  289. calculatedPart := rawData[0 : len(rawData)-BinlogChecksumLength]
  290. expectedChecksum := rawData[len(rawData)-BinlogChecksumLength:]
  291. // mysql use zlib's CRC32 implementation, which uses polynomial 0xedb88320UL.
  292. // reference: https://github.com/madler/zlib/blob/master/crc32.c
  293. // https://github.com/madler/zlib/blob/master/doc/rfc1952.txt#L419
  294. checksum := crc32.ChecksumIEEE(calculatedPart)
  295. computed := make([]byte, BinlogChecksumLength)
  296. binary.LittleEndian.PutUint32(computed, checksum)
  297. if !bytes.Equal(expectedChecksum, computed) {
  298. return ErrChecksumMismatch
  299. }
  300. return nil
  301. }
  302. func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
  303. e := &RowsEvent{}
  304. if p.format.EventTypeHeaderLengths[h.EventType-1] == 6 {
  305. e.tableIDSize = 4
  306. } else {
  307. e.tableIDSize = 6
  308. }
  309. e.needBitmap2 = false
  310. e.tables = p.tables
  311. e.parseTime = p.parseTime
  312. e.timestampStringLocation = p.timestampStringLocation
  313. e.useDecimal = p.useDecimal
  314. e.ignoreJSONDecodeErr = p.ignoreJSONDecodeErr
  315. switch h.EventType {
  316. case WRITE_ROWS_EVENTv0:
  317. e.Version = 0
  318. case UPDATE_ROWS_EVENTv0:
  319. e.Version = 0
  320. case DELETE_ROWS_EVENTv0:
  321. e.Version = 0
  322. case WRITE_ROWS_EVENTv1:
  323. e.Version = 1
  324. case DELETE_ROWS_EVENTv1:
  325. e.Version = 1
  326. case UPDATE_ROWS_EVENTv1:
  327. e.Version = 1
  328. e.needBitmap2 = true
  329. case WRITE_ROWS_EVENTv2:
  330. e.Version = 2
  331. case UPDATE_ROWS_EVENTv2:
  332. e.Version = 2
  333. e.needBitmap2 = true
  334. case DELETE_ROWS_EVENTv2:
  335. e.Version = 2
  336. }
  337. return e
  338. }