123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- package mysql
- import (
- "bytes"
- "fmt"
- "sort"
- "strconv"
- "strings"
- "github.com/pingcap/errors"
- "github.com/siddontang/go-log/log"
- )
- // MariadbGTID represent mariadb gtid, [domain ID]-[server-id]-[sequence]
- type MariadbGTID struct {
- DomainID uint32
- ServerID uint32
- SequenceNumber uint64
- }
- // ParseMariadbGTID parses mariadb gtid, [domain ID]-[server-id]-[sequence]
- func ParseMariadbGTID(str string) (*MariadbGTID, error) {
- if len(str) == 0 {
- return &MariadbGTID{0, 0, 0}, nil
- }
- seps := strings.Split(str, "-")
- gtid := new(MariadbGTID)
- if len(seps) != 3 {
- return gtid, errors.Errorf("invalid Mariadb GTID %v, must domain-server-sequence", str)
- }
- domainID, err := strconv.ParseUint(seps[0], 10, 32)
- if err != nil {
- return gtid, errors.Errorf("invalid MariaDB GTID Domain ID (%v): %v", seps[0], err)
- }
- serverID, err := strconv.ParseUint(seps[1], 10, 32)
- if err != nil {
- return gtid, errors.Errorf("invalid MariaDB GTID Server ID (%v): %v", seps[1], err)
- }
- sequenceID, err := strconv.ParseUint(seps[2], 10, 64)
- if err != nil {
- return gtid, errors.Errorf("invalid MariaDB GTID Sequence number (%v): %v", seps[2], err)
- }
- return &MariadbGTID{
- DomainID: uint32(domainID),
- ServerID: uint32(serverID),
- SequenceNumber: sequenceID}, nil
- }
- func (gtid *MariadbGTID) String() string {
- if gtid.DomainID == 0 && gtid.ServerID == 0 && gtid.SequenceNumber == 0 {
- return ""
- }
- return fmt.Sprintf("%d-%d-%d", gtid.DomainID, gtid.ServerID, gtid.SequenceNumber)
- }
- // Contain return whether one mariadb gtid covers another mariadb gtid
- func (gtid *MariadbGTID) Contain(other *MariadbGTID) bool {
- return gtid.DomainID == other.DomainID && gtid.SequenceNumber >= other.SequenceNumber
- }
- // Clone clones a mariadb gtid
- func (gtid *MariadbGTID) Clone() *MariadbGTID {
- o := new(MariadbGTID)
- *o = *gtid
- return o
- }
- func (gtid *MariadbGTID) forward(newer *MariadbGTID) error {
- if newer.DomainID != gtid.DomainID {
- return errors.Errorf("%s is not same with doamin of %s", newer, gtid)
- }
- /*
- Here's a simplified example of binlog events.
- Although I think one domain should have only one update at same time, we can't limit the user's usage.
- we just output a warn log and let it go on
- | mysqld-bin.000001 | 1453 | Gtid | 112 | 1495 | BEGIN GTID 0-112-6 |
- | mysqld-bin.000001 | 1624 | Xid | 112 | 1655 | COMMIT xid=74 |
- | mysqld-bin.000001 | 1655 | Gtid | 112 | 1697 | BEGIN GTID 0-112-7 |
- | mysqld-bin.000001 | 1826 | Xid | 112 | 1857 | COMMIT xid=75 |
- | mysqld-bin.000001 | 1857 | Gtid | 111 | 1899 | BEGIN GTID 0-111-5 |
- | mysqld-bin.000001 | 1981 | Xid | 111 | 2012 | COMMIT xid=77 |
- | mysqld-bin.000001 | 2012 | Gtid | 112 | 2054 | BEGIN GTID 0-112-8 |
- | mysqld-bin.000001 | 2184 | Xid | 112 | 2215 | COMMIT xid=116 |
- | mysqld-bin.000001 | 2215 | Gtid | 111 | 2257 | BEGIN GTID 0-111-6 |
- */
- if newer.SequenceNumber <= gtid.SequenceNumber {
- log.Warnf("out of order binlog appears with gtid %s vs current position gtid %s", newer, gtid)
- }
- gtid.ServerID = newer.ServerID
- gtid.SequenceNumber = newer.SequenceNumber
- return nil
- }
- // MariadbGTIDSet is a set of mariadb gtid
- type MariadbGTIDSet struct {
- Sets map[uint32]*MariadbGTID
- }
- // ParseMariadbGTIDSet parses str into mariadb gtid sets
- func ParseMariadbGTIDSet(str string) (GTIDSet, error) {
- s := new(MariadbGTIDSet)
- s.Sets = make(map[uint32]*MariadbGTID)
- if str == "" {
- return s, nil
- }
- err := s.Update(str)
- if err != nil {
- return nil, err
- }
- return s, nil
- }
- // AddSet adds mariadb gtid into mariadb gtid set
- func (s *MariadbGTIDSet) AddSet(gtid *MariadbGTID) error {
- if gtid == nil {
- return nil
- }
- o, ok := s.Sets[gtid.DomainID]
- if ok {
- err := o.forward(gtid)
- if err != nil {
- return errors.Trace(err)
- }
- } else {
- s.Sets[gtid.DomainID] = gtid
- }
- return nil
- }
- // Update updates mariadb gtid set
- func (s *MariadbGTIDSet) Update(GTIDStr string) error {
- sp := strings.Split(GTIDStr, ",")
- //todo, handle redundant same uuid
- for i := 0; i < len(sp); i++ {
- gtid, err := ParseMariadbGTID(sp[i])
- if err != nil {
- return errors.Trace(err)
- }
- err = s.AddSet(gtid)
- if err != nil {
- return errors.Trace(err)
- }
- }
- return nil
- }
- func (s *MariadbGTIDSet) String() string {
- sets := make([]string, 0, len(s.Sets))
- for _, set := range s.Sets {
- sets = append(sets, set.String())
- }
- sort.Strings(sets)
- return strings.Join(sets, ",")
- }
- // Encode encodes mariadb gtid set
- func (s *MariadbGTIDSet) Encode() []byte {
- var buf bytes.Buffer
- sep := ""
- for _, gtid := range s.Sets {
- buf.WriteString(sep)
- buf.WriteString(gtid.String())
- sep = ","
- }
- return buf.Bytes()
- }
- // Clone clones a mariadb gtid set
- func (s *MariadbGTIDSet) Clone() GTIDSet {
- clone := &MariadbGTIDSet{
- Sets: make(map[uint32]*MariadbGTID),
- }
- for domainID, gtid := range s.Sets {
- clone.Sets[domainID] = gtid.Clone()
- }
- return clone
- }
- // Equal returns true if two mariadb gtid set is same, otherwise return false
- func (s *MariadbGTIDSet) Equal(o GTIDSet) bool {
- other, ok := o.(*MariadbGTIDSet)
- if !ok {
- return false
- }
- if len(other.Sets) != len(s.Sets) {
- return false
- }
- for domainID, gtid := range other.Sets {
- o, ok := s.Sets[domainID]
- if !ok {
- return false
- }
- if *gtid != *o {
- return false
- }
- }
- return true
- }
- // Contain return whether one mariadb gtid set covers another mariadb gtid set
- func (s *MariadbGTIDSet) Contain(o GTIDSet) bool {
- other, ok := o.(*MariadbGTIDSet)
- if !ok {
- return false
- }
- for doaminID, gtid := range other.Sets {
- o, ok := s.Sets[doaminID]
- if !ok {
- return false
- }
- if !o.Contain(gtid) {
- return false
- }
- }
- return true
- }
|