mariadb_gtid.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package mysql
  2. import (
  3. "bytes"
  4. "fmt"
  5. "sort"
  6. "strconv"
  7. "strings"
  8. "github.com/pingcap/errors"
  9. "github.com/siddontang/go-log/log"
  10. )
  11. // MariadbGTID represent mariadb gtid, [domain ID]-[server-id]-[sequence]
  12. type MariadbGTID struct {
  13. DomainID uint32
  14. ServerID uint32
  15. SequenceNumber uint64
  16. }
  17. // ParseMariadbGTID parses mariadb gtid, [domain ID]-[server-id]-[sequence]
  18. func ParseMariadbGTID(str string) (*MariadbGTID, error) {
  19. if len(str) == 0 {
  20. return &MariadbGTID{0, 0, 0}, nil
  21. }
  22. seps := strings.Split(str, "-")
  23. gtid := new(MariadbGTID)
  24. if len(seps) != 3 {
  25. return gtid, errors.Errorf("invalid Mariadb GTID %v, must domain-server-sequence", str)
  26. }
  27. domainID, err := strconv.ParseUint(seps[0], 10, 32)
  28. if err != nil {
  29. return gtid, errors.Errorf("invalid MariaDB GTID Domain ID (%v): %v", seps[0], err)
  30. }
  31. serverID, err := strconv.ParseUint(seps[1], 10, 32)
  32. if err != nil {
  33. return gtid, errors.Errorf("invalid MariaDB GTID Server ID (%v): %v", seps[1], err)
  34. }
  35. sequenceID, err := strconv.ParseUint(seps[2], 10, 64)
  36. if err != nil {
  37. return gtid, errors.Errorf("invalid MariaDB GTID Sequence number (%v): %v", seps[2], err)
  38. }
  39. return &MariadbGTID{
  40. DomainID: uint32(domainID),
  41. ServerID: uint32(serverID),
  42. SequenceNumber: sequenceID}, nil
  43. }
  44. func (gtid *MariadbGTID) String() string {
  45. if gtid.DomainID == 0 && gtid.ServerID == 0 && gtid.SequenceNumber == 0 {
  46. return ""
  47. }
  48. return fmt.Sprintf("%d-%d-%d", gtid.DomainID, gtid.ServerID, gtid.SequenceNumber)
  49. }
  50. // Contain return whether one mariadb gtid covers another mariadb gtid
  51. func (gtid *MariadbGTID) Contain(other *MariadbGTID) bool {
  52. return gtid.DomainID == other.DomainID && gtid.SequenceNumber >= other.SequenceNumber
  53. }
  54. // Clone clones a mariadb gtid
  55. func (gtid *MariadbGTID) Clone() *MariadbGTID {
  56. o := new(MariadbGTID)
  57. *o = *gtid
  58. return o
  59. }
  60. func (gtid *MariadbGTID) forward(newer *MariadbGTID) error {
  61. if newer.DomainID != gtid.DomainID {
  62. return errors.Errorf("%s is not same with doamin of %s", newer, gtid)
  63. }
  64. /*
  65. Here's a simplified example of binlog events.
  66. Although I think one domain should have only one update at same time, we can't limit the user's usage.
  67. we just output a warn log and let it go on
  68. | mysqld-bin.000001 | 1453 | Gtid | 112 | 1495 | BEGIN GTID 0-112-6 |
  69. | mysqld-bin.000001 | 1624 | Xid | 112 | 1655 | COMMIT xid=74 |
  70. | mysqld-bin.000001 | 1655 | Gtid | 112 | 1697 | BEGIN GTID 0-112-7 |
  71. | mysqld-bin.000001 | 1826 | Xid | 112 | 1857 | COMMIT xid=75 |
  72. | mysqld-bin.000001 | 1857 | Gtid | 111 | 1899 | BEGIN GTID 0-111-5 |
  73. | mysqld-bin.000001 | 1981 | Xid | 111 | 2012 | COMMIT xid=77 |
  74. | mysqld-bin.000001 | 2012 | Gtid | 112 | 2054 | BEGIN GTID 0-112-8 |
  75. | mysqld-bin.000001 | 2184 | Xid | 112 | 2215 | COMMIT xid=116 |
  76. | mysqld-bin.000001 | 2215 | Gtid | 111 | 2257 | BEGIN GTID 0-111-6 |
  77. */
  78. if newer.SequenceNumber <= gtid.SequenceNumber {
  79. log.Warnf("out of order binlog appears with gtid %s vs current position gtid %s", newer, gtid)
  80. }
  81. gtid.ServerID = newer.ServerID
  82. gtid.SequenceNumber = newer.SequenceNumber
  83. return nil
  84. }
  85. // MariadbGTIDSet is a set of mariadb gtid
  86. type MariadbGTIDSet struct {
  87. Sets map[uint32]*MariadbGTID
  88. }
  89. // ParseMariadbGTIDSet parses str into mariadb gtid sets
  90. func ParseMariadbGTIDSet(str string) (GTIDSet, error) {
  91. s := new(MariadbGTIDSet)
  92. s.Sets = make(map[uint32]*MariadbGTID)
  93. if str == "" {
  94. return s, nil
  95. }
  96. err := s.Update(str)
  97. if err != nil {
  98. return nil, err
  99. }
  100. return s, nil
  101. }
  102. // AddSet adds mariadb gtid into mariadb gtid set
  103. func (s *MariadbGTIDSet) AddSet(gtid *MariadbGTID) error {
  104. if gtid == nil {
  105. return nil
  106. }
  107. o, ok := s.Sets[gtid.DomainID]
  108. if ok {
  109. err := o.forward(gtid)
  110. if err != nil {
  111. return errors.Trace(err)
  112. }
  113. } else {
  114. s.Sets[gtid.DomainID] = gtid
  115. }
  116. return nil
  117. }
  118. // Update updates mariadb gtid set
  119. func (s *MariadbGTIDSet) Update(GTIDStr string) error {
  120. sp := strings.Split(GTIDStr, ",")
  121. //todo, handle redundant same uuid
  122. for i := 0; i < len(sp); i++ {
  123. gtid, err := ParseMariadbGTID(sp[i])
  124. if err != nil {
  125. return errors.Trace(err)
  126. }
  127. err = s.AddSet(gtid)
  128. if err != nil {
  129. return errors.Trace(err)
  130. }
  131. }
  132. return nil
  133. }
  134. func (s *MariadbGTIDSet) String() string {
  135. sets := make([]string, 0, len(s.Sets))
  136. for _, set := range s.Sets {
  137. sets = append(sets, set.String())
  138. }
  139. sort.Strings(sets)
  140. return strings.Join(sets, ",")
  141. }
  142. // Encode encodes mariadb gtid set
  143. func (s *MariadbGTIDSet) Encode() []byte {
  144. var buf bytes.Buffer
  145. sep := ""
  146. for _, gtid := range s.Sets {
  147. buf.WriteString(sep)
  148. buf.WriteString(gtid.String())
  149. sep = ","
  150. }
  151. return buf.Bytes()
  152. }
  153. // Clone clones a mariadb gtid set
  154. func (s *MariadbGTIDSet) Clone() GTIDSet {
  155. clone := &MariadbGTIDSet{
  156. Sets: make(map[uint32]*MariadbGTID),
  157. }
  158. for domainID, gtid := range s.Sets {
  159. clone.Sets[domainID] = gtid.Clone()
  160. }
  161. return clone
  162. }
  163. // Equal returns true if two mariadb gtid set is same, otherwise return false
  164. func (s *MariadbGTIDSet) Equal(o GTIDSet) bool {
  165. other, ok := o.(*MariadbGTIDSet)
  166. if !ok {
  167. return false
  168. }
  169. if len(other.Sets) != len(s.Sets) {
  170. return false
  171. }
  172. for domainID, gtid := range other.Sets {
  173. o, ok := s.Sets[domainID]
  174. if !ok {
  175. return false
  176. }
  177. if *gtid != *o {
  178. return false
  179. }
  180. }
  181. return true
  182. }
  183. // Contain return whether one mariadb gtid set covers another mariadb gtid set
  184. func (s *MariadbGTIDSet) Contain(o GTIDSet) bool {
  185. other, ok := o.(*MariadbGTIDSet)
  186. if !ok {
  187. return false
  188. }
  189. for doaminID, gtid := range other.Sets {
  190. o, ok := s.Sets[doaminID]
  191. if !ok {
  192. return false
  193. }
  194. if !o.Contain(gtid) {
  195. return false
  196. }
  197. }
  198. return true
  199. }