123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661 |
- package replication
- import (
- "encoding/binary"
- "encoding/hex"
- "fmt"
- "io"
- "strconv"
- "strings"
- "time"
- "unicode"
- "github.com/google/uuid"
- "github.com/pingcap/errors"
- . "github.com/go-mysql-org/go-mysql/mysql"
- )
- const (
- EventHeaderSize = 19
- SidLength = 16
- LogicalTimestampTypeCode = 2
- PartLogicalTimestampLength = 8
- BinlogChecksumLength = 4
- UndefinedServerVer = 999999 // UNDEFINED_SERVER_VERSION
- )
- type BinlogEvent struct {
- // raw binlog data which contains all data, including binlog header and event body, and including crc32 checksum if exists
- RawData []byte
- Header *EventHeader
- Event Event
- }
- func (e *BinlogEvent) Dump(w io.Writer) {
- e.Header.Dump(w)
- e.Event.Dump(w)
- }
- type Event interface {
- //Dump Event, format like python-mysql-replication
- Dump(w io.Writer)
- Decode(data []byte) error
- }
- type EventError struct {
- Header *EventHeader
- //Error message
- Err string
- //Event data
- Data []byte
- }
- func (e *EventError) Error() string {
- return fmt.Sprintf("Header %#v, Data %q, Err: %v", e.Header, e.Data, e.Err)
- }
- type EventHeader struct {
- Timestamp uint32
- EventType EventType
- ServerID uint32
- EventSize uint32
- LogPos uint32
- Flags uint16
- }
- func (h *EventHeader) Decode(data []byte) error {
- if len(data) < EventHeaderSize {
- return errors.Errorf("header size too short %d, must 19", len(data))
- }
- pos := 0
- h.Timestamp = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- h.EventType = EventType(data[pos])
- pos++
- h.ServerID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- h.EventSize = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- h.LogPos = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- h.Flags = binary.LittleEndian.Uint16(data[pos:])
- // pos += 2
- if h.EventSize < uint32(EventHeaderSize) {
- return errors.Errorf("invalid event size %d, must >= 19", h.EventSize)
- }
- return nil
- }
- func (h *EventHeader) Dump(w io.Writer) {
- fmt.Fprintf(w, "=== %s ===\n", h.EventType)
- fmt.Fprintf(w, "Date: %s\n", time.Unix(int64(h.Timestamp), 0).Format(TimeFormat))
- fmt.Fprintf(w, "Log position: %d\n", h.LogPos)
- fmt.Fprintf(w, "Event size: %d\n", h.EventSize)
- }
- var (
- checksumVersionSplitMysql []int = []int{5, 6, 1}
- checksumVersionProductMysql int = (checksumVersionSplitMysql[0]*256+checksumVersionSplitMysql[1])*256 + checksumVersionSplitMysql[2]
- checksumVersionSplitMariaDB []int = []int{5, 3, 0}
- checksumVersionProductMariaDB int = (checksumVersionSplitMariaDB[0]*256+checksumVersionSplitMariaDB[1])*256 + checksumVersionSplitMariaDB[2]
- )
- // server version format X.Y.Zabc, a is not . or number
- func splitServerVersion(server string) []int {
- seps := strings.Split(server, ".")
- if len(seps) < 3 {
- return []int{0, 0, 0}
- }
- x, _ := strconv.Atoi(seps[0])
- y, _ := strconv.Atoi(seps[1])
- index := 0
- for i, c := range seps[2] {
- if !unicode.IsNumber(c) {
- index = i
- break
- }
- }
- z, _ := strconv.Atoi(seps[2][0:index])
- return []int{x, y, z}
- }
- func calcVersionProduct(server string) int {
- versionSplit := splitServerVersion(server)
- return ((versionSplit[0]*256+versionSplit[1])*256 + versionSplit[2])
- }
- type FormatDescriptionEvent struct {
- Version uint16
- //len = 50
- ServerVersion []byte
- CreateTimestamp uint32
- EventHeaderLength uint8
- EventTypeHeaderLengths []byte
- // 0 is off, 1 is for CRC32, 255 is undefined
- ChecksumAlgorithm byte
- }
- func (e *FormatDescriptionEvent) Decode(data []byte) error {
- pos := 0
- e.Version = binary.LittleEndian.Uint16(data[pos:])
- pos += 2
- e.ServerVersion = make([]byte, 50)
- copy(e.ServerVersion, data[pos:])
- pos += 50
- e.CreateTimestamp = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.EventHeaderLength = data[pos]
- pos++
- if e.EventHeaderLength != byte(EventHeaderSize) {
- return errors.Errorf("invalid event header length %d, must 19", e.EventHeaderLength)
- }
- server := string(e.ServerVersion)
- checksumProduct := checksumVersionProductMysql
- if strings.Contains(strings.ToLower(server), "mariadb") {
- checksumProduct = checksumVersionProductMariaDB
- }
- if calcVersionProduct(string(e.ServerVersion)) >= checksumProduct {
- // here, the last 5 bytes is 1 byte check sum alg type and 4 byte checksum if exists
- e.ChecksumAlgorithm = data[len(data)-5]
- e.EventTypeHeaderLengths = data[pos : len(data)-5]
- } else {
- e.ChecksumAlgorithm = BINLOG_CHECKSUM_ALG_UNDEF
- e.EventTypeHeaderLengths = data[pos:]
- }
- return nil
- }
- func (e *FormatDescriptionEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Version: %d\n", e.Version)
- fmt.Fprintf(w, "Server version: %s\n", e.ServerVersion)
- //fmt.Fprintf(w, "Create date: %s\n", time.Unix(int64(e.CreateTimestamp), 0).Format(TimeFormat))
- fmt.Fprintf(w, "Checksum algorithm: %d\n", e.ChecksumAlgorithm)
- //fmt.Fprintf(w, "Event header lengths: \n%s", hex.Dump(e.EventTypeHeaderLengths))
- fmt.Fprintln(w)
- }
- type RotateEvent struct {
- Position uint64
- NextLogName []byte
- }
- func (e *RotateEvent) Decode(data []byte) error {
- e.Position = binary.LittleEndian.Uint64(data[0:])
- e.NextLogName = data[8:]
- return nil
- }
- func (e *RotateEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Position: %d\n", e.Position)
- fmt.Fprintf(w, "Next log name: %s\n", e.NextLogName)
- fmt.Fprintln(w)
- }
- type PreviousGTIDsEvent struct {
- GTIDSets string
- }
- func (e *PreviousGTIDsEvent) Decode(data []byte) error {
- var previousGTIDSets []string
- pos := 0
- uuidCount := binary.LittleEndian.Uint16(data[pos : pos+8])
- pos += 8
- for i := uint16(0); i < uuidCount; i++ {
- uuid := e.decodeUuid(data[pos : pos+16])
- pos += 16
- sliceCount := binary.LittleEndian.Uint16(data[pos : pos+8])
- pos += 8
- var intervals []string
- for i := uint16(0); i < sliceCount; i++ {
- start := e.decodeInterval(data[pos : pos+8])
- pos += 8
- stop := e.decodeInterval(data[pos : pos+8])
- pos += 8
- interval := ""
- if stop == start+1 {
- interval = fmt.Sprintf("%d", start)
- } else {
- interval = fmt.Sprintf("%d-%d", start, stop-1)
- }
- intervals = append(intervals, interval)
- }
- previousGTIDSets = append(previousGTIDSets, fmt.Sprintf("%s:%s", uuid, strings.Join(intervals, ":")))
- }
- e.GTIDSets = strings.Join(previousGTIDSets, ",")
- return nil
- }
- func (e *PreviousGTIDsEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Previous GTID Event: %s\n", e.GTIDSets)
- fmt.Fprintln(w)
- }
- func (e *PreviousGTIDsEvent) decodeUuid(data []byte) string {
- return fmt.Sprintf("%s-%s-%s-%s-%s", hex.EncodeToString(data[0:4]), hex.EncodeToString(data[4:6]),
- hex.EncodeToString(data[6:8]), hex.EncodeToString(data[8:10]), hex.EncodeToString(data[10:]))
- }
- func (e *PreviousGTIDsEvent) decodeInterval(data []byte) uint64 {
- return binary.LittleEndian.Uint64(data)
- }
- type XIDEvent struct {
- XID uint64
- // in fact XIDEvent dosen't have the GTIDSet information, just for beneficial to use
- GSet GTIDSet
- }
- func (e *XIDEvent) Decode(data []byte) error {
- e.XID = binary.LittleEndian.Uint64(data)
- return nil
- }
- func (e *XIDEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "XID: %d\n", e.XID)
- if e.GSet != nil {
- fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String())
- }
- fmt.Fprintln(w)
- }
- type QueryEvent struct {
- SlaveProxyID uint32
- ExecutionTime uint32
- ErrorCode uint16
- StatusVars []byte
- Schema []byte
- Query []byte
- // in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use
- GSet GTIDSet
- }
- func (e *QueryEvent) Decode(data []byte) error {
- pos := 0
- e.SlaveProxyID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.ExecutionTime = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- schemaLength := data[pos]
- pos++
- e.ErrorCode = binary.LittleEndian.Uint16(data[pos:])
- pos += 2
- statusVarsLength := binary.LittleEndian.Uint16(data[pos:])
- pos += 2
- e.StatusVars = data[pos : pos+int(statusVarsLength)]
- pos += int(statusVarsLength)
- e.Schema = data[pos : pos+int(schemaLength)]
- pos += int(schemaLength)
- //skip 0x00
- pos++
- e.Query = data[pos:]
- return nil
- }
- func (e *QueryEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Slave proxy ID: %d\n", e.SlaveProxyID)
- fmt.Fprintf(w, "Execution time: %d\n", e.ExecutionTime)
- fmt.Fprintf(w, "Error code: %d\n", e.ErrorCode)
- //fmt.Fprintf(w, "Status vars: \n%s", hex.Dump(e.StatusVars))
- fmt.Fprintf(w, "Schema: %s\n", e.Schema)
- fmt.Fprintf(w, "Query: %s\n", e.Query)
- if e.GSet != nil {
- fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String())
- }
- fmt.Fprintln(w)
- }
- type GTIDEvent struct {
- CommitFlag uint8
- SID []byte
- GNO int64
- LastCommitted int64
- SequenceNumber int64
- // ImmediateCommitTimestamp/OriginalCommitTimestamp are introduced in MySQL-8.0.1, see:
- // https://mysqlhighavailability.com/replication-features-in-mysql-8-0-1/
- ImmediateCommitTimestamp uint64
- OriginalCommitTimestamp uint64
- // Total transaction length (including this GTIDEvent), introduced in MySQL-8.0.2, see:
- // https://mysqlhighavailability.com/taking-advantage-of-new-transaction-length-metadata/
- TransactionLength uint64
- // ImmediateServerVersion/OriginalServerVersion are introduced in MySQL-8.0.14, see
- // https://dev.mysql.com/doc/refman/8.0/en/replication-compatibility.html
- ImmediateServerVersion uint32
- OriginalServerVersion uint32
- }
- func (e *GTIDEvent) Decode(data []byte) error {
- pos := 0
- e.CommitFlag = data[pos]
- pos++
- e.SID = data[pos : pos+SidLength]
- pos += SidLength
- e.GNO = int64(binary.LittleEndian.Uint64(data[pos:]))
- pos += 8
- if len(data) >= 42 {
- if data[pos] == LogicalTimestampTypeCode {
- pos++
- e.LastCommitted = int64(binary.LittleEndian.Uint64(data[pos:]))
- pos += PartLogicalTimestampLength
- e.SequenceNumber = int64(binary.LittleEndian.Uint64(data[pos:]))
- pos += 8
- // IMMEDIATE_COMMIT_TIMESTAMP_LENGTH = 7
- if len(data)-pos < 7 {
- return nil
- }
- e.ImmediateCommitTimestamp = FixedLengthInt(data[pos : pos+7])
- pos += 7
- if (e.ImmediateCommitTimestamp & (uint64(1) << 55)) != 0 {
- // If the most significant bit set, another 7 byte follows representing OriginalCommitTimestamp
- e.ImmediateCommitTimestamp &= ^(uint64(1) << 55)
- e.OriginalCommitTimestamp = FixedLengthInt(data[pos : pos+7])
- pos += 7
- } else {
- // Otherwise OriginalCommitTimestamp == ImmediateCommitTimestamp
- e.OriginalCommitTimestamp = e.ImmediateCommitTimestamp
- }
- // TRANSACTION_LENGTH_MIN_LENGTH = 1
- if len(data)-pos < 1 {
- return nil
- }
- var n int
- e.TransactionLength, _, n = LengthEncodedInt(data[pos:])
- pos += n
- // IMMEDIATE_SERVER_VERSION_LENGTH = 4
- e.ImmediateServerVersion = UndefinedServerVer
- e.OriginalServerVersion = UndefinedServerVer
- if len(data)-pos < 4 {
- return nil
- }
- e.ImmediateServerVersion = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- if (e.ImmediateServerVersion & (uint32(1) << 31)) != 0 {
- // If the most significant bit set, another 4 byte follows representing OriginalServerVersion
- e.ImmediateServerVersion &= ^(uint32(1) << 31)
- e.OriginalServerVersion = binary.LittleEndian.Uint32(data[pos:])
- // pos += 4
- } else {
- // Otherwise OriginalServerVersion == ImmediateServerVersion
- e.OriginalServerVersion = e.ImmediateServerVersion
- }
- }
- }
- return nil
- }
- func (e *GTIDEvent) Dump(w io.Writer) {
- fmtTime := func(t time.Time) string {
- if t.IsZero() {
- return "<n/a>"
- }
- return t.Format(time.RFC3339Nano)
- }
- fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag)
- u, _ := uuid.FromBytes(e.SID)
- fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO)
- fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted)
- fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber)
- fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime()))
- fmt.Fprintf(w, "Orignal commmit timestamp: %d (%s)\n", e.OriginalCommitTimestamp, fmtTime(e.OriginalCommitTime()))
- fmt.Fprintf(w, "Transaction length: %d\n", e.TransactionLength)
- fmt.Fprintf(w, "Immediate server version: %d\n", e.ImmediateServerVersion)
- fmt.Fprintf(w, "Orignal server version: %d\n", e.OriginalServerVersion)
- fmt.Fprintln(w)
- }
- // ImmediateCommitTime returns the commit time of this trx on the immediate server
- // or zero time if not available.
- func (e *GTIDEvent) ImmediateCommitTime() time.Time {
- return microSecTimestampToTime(e.ImmediateCommitTimestamp)
- }
- // OriginalCommitTime returns the commit time of this trx on the original server
- // or zero time if not available.
- func (e *GTIDEvent) OriginalCommitTime() time.Time {
- return microSecTimestampToTime(e.OriginalCommitTimestamp)
- }
- type BeginLoadQueryEvent struct {
- FileID uint32
- BlockData []byte
- }
- func (e *BeginLoadQueryEvent) Decode(data []byte) error {
- pos := 0
- e.FileID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.BlockData = data[pos:]
- return nil
- }
- func (e *BeginLoadQueryEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "File ID: %d\n", e.FileID)
- fmt.Fprintf(w, "Block data: %s\n", e.BlockData)
- fmt.Fprintln(w)
- }
- type ExecuteLoadQueryEvent struct {
- SlaveProxyID uint32
- ExecutionTime uint32
- SchemaLength uint8
- ErrorCode uint16
- StatusVars uint16
- FileID uint32
- StartPos uint32
- EndPos uint32
- DupHandlingFlags uint8
- }
- func (e *ExecuteLoadQueryEvent) Decode(data []byte) error {
- pos := 0
- e.SlaveProxyID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.ExecutionTime = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.SchemaLength = data[pos]
- pos++
- e.ErrorCode = binary.LittleEndian.Uint16(data[pos:])
- pos += 2
- e.StatusVars = binary.LittleEndian.Uint16(data[pos:])
- pos += 2
- e.FileID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.StartPos = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.EndPos = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.DupHandlingFlags = data[pos]
- return nil
- }
- func (e *ExecuteLoadQueryEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Slave proxy ID: %d\n", e.SlaveProxyID)
- fmt.Fprintf(w, "Execution time: %d\n", e.ExecutionTime)
- fmt.Fprintf(w, "Schame length: %d\n", e.SchemaLength)
- fmt.Fprintf(w, "Error code: %d\n", e.ErrorCode)
- fmt.Fprintf(w, "Status vars length: %d\n", e.StatusVars)
- fmt.Fprintf(w, "File ID: %d\n", e.FileID)
- fmt.Fprintf(w, "Start pos: %d\n", e.StartPos)
- fmt.Fprintf(w, "End pos: %d\n", e.EndPos)
- fmt.Fprintf(w, "Dup handling flags: %d\n", e.DupHandlingFlags)
- fmt.Fprintln(w)
- }
- // case MARIADB_ANNOTATE_ROWS_EVENT:
- // return "MariadbAnnotateRowsEvent"
- type MariadbAnnotateRowsEvent struct {
- Query []byte
- }
- func (e *MariadbAnnotateRowsEvent) Decode(data []byte) error {
- e.Query = data
- return nil
- }
- func (e *MariadbAnnotateRowsEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Query: %s\n", e.Query)
- fmt.Fprintln(w)
- }
- type MariadbBinlogCheckPointEvent struct {
- Info []byte
- }
- func (e *MariadbBinlogCheckPointEvent) Decode(data []byte) error {
- e.Info = data
- return nil
- }
- func (e *MariadbBinlogCheckPointEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Info: %s\n", e.Info)
- fmt.Fprintln(w)
- }
- type MariadbGTIDEvent struct {
- GTID MariadbGTID
- Flags byte
- CommitID uint64
- }
- func (e *MariadbGTIDEvent) IsDDL() bool {
- return (e.Flags & BINLOG_MARIADB_FL_DDL) != 0
- }
- func (e *MariadbGTIDEvent) IsStandalone() bool {
- return (e.Flags & BINLOG_MARIADB_FL_STANDALONE) != 0
- }
- func (e *MariadbGTIDEvent) IsGroupCommit() bool {
- return (e.Flags & BINLOG_MARIADB_FL_GROUP_COMMIT_ID) != 0
- }
- func (e *MariadbGTIDEvent) Decode(data []byte) error {
- pos := 0
- e.GTID.SequenceNumber = binary.LittleEndian.Uint64(data)
- pos += 8
- e.GTID.DomainID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.Flags = data[pos]
- pos += 1
- if (e.Flags & BINLOG_MARIADB_FL_GROUP_COMMIT_ID) > 0 {
- e.CommitID = binary.LittleEndian.Uint64(data[pos:])
- }
- return nil
- }
- func (e *MariadbGTIDEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "GTID: %v\n", e.GTID)
- fmt.Fprintf(w, "Flags: %v\n", e.Flags)
- fmt.Fprintf(w, "CommitID: %v\n", e.CommitID)
- fmt.Fprintln(w)
- }
- type MariadbGTIDListEvent struct {
- GTIDs []MariadbGTID
- }
- func (e *MariadbGTIDListEvent) Decode(data []byte) error {
- pos := 0
- v := binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- count := v & uint32((1<<28)-1)
- e.GTIDs = make([]MariadbGTID, count)
- for i := uint32(0); i < count; i++ {
- e.GTIDs[i].DomainID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.GTIDs[i].ServerID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.GTIDs[i].SequenceNumber = binary.LittleEndian.Uint64(data[pos:])
- pos += 8
- }
- return nil
- }
- func (e *MariadbGTIDListEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Lists: %v\n", e.GTIDs)
- fmt.Fprintln(w)
- }
- type IntVarEvent struct {
- Type IntVarEventType
- Value uint64
- }
- func (i *IntVarEvent) Decode(data []byte) error {
- i.Type = IntVarEventType(data[0])
- i.Value = binary.LittleEndian.Uint64(data[1:])
- return nil
- }
- func (i *IntVarEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Type: %d\n", i.Type)
- fmt.Fprintf(w, "Value: %d\n", i.Value)
- }
|