mysql_gtid.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. package mysql
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "math"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "github.com/google/uuid"
  12. "github.com/pingcap/errors"
  13. "github.com/siddontang/go/hack"
  14. )
  15. // Like MySQL GTID Interval struct, [start, stop), left closed and right open
  16. // See MySQL rpl_gtid.h
  17. type Interval struct {
  18. // The first GID of this interval.
  19. Start int64
  20. // The first GID after this interval.
  21. Stop int64
  22. }
  23. // Interval is [start, stop), but the GTID string's format is [n] or [n1-n2], closed interval
  24. func parseInterval(str string) (i Interval, err error) {
  25. p := strings.Split(str, "-")
  26. switch len(p) {
  27. case 1:
  28. i.Start, err = strconv.ParseInt(p[0], 10, 64)
  29. i.Stop = i.Start + 1
  30. case 2:
  31. i.Start, err = strconv.ParseInt(p[0], 10, 64)
  32. if err == nil {
  33. i.Stop, err = strconv.ParseInt(p[1], 10, 64)
  34. i.Stop++
  35. }
  36. default:
  37. err = errors.Errorf("invalid interval format, must n[-n]")
  38. }
  39. if err != nil {
  40. return
  41. }
  42. if i.Stop <= i.Start {
  43. err = errors.Errorf("invalid interval format, must n[-n] and the end must >= start")
  44. }
  45. return
  46. }
  47. func (i Interval) String() string {
  48. if i.Stop == i.Start+1 {
  49. return fmt.Sprintf("%d", i.Start)
  50. } else {
  51. return fmt.Sprintf("%d-%d", i.Start, i.Stop-1)
  52. }
  53. }
  54. type IntervalSlice []Interval
  55. func (s IntervalSlice) Len() int {
  56. return len(s)
  57. }
  58. func (s IntervalSlice) Less(i, j int) bool {
  59. if s[i].Start < s[j].Start {
  60. return true
  61. } else if s[i].Start > s[j].Start {
  62. return false
  63. } else {
  64. return s[i].Stop < s[j].Stop
  65. }
  66. }
  67. func (s IntervalSlice) Swap(i, j int) {
  68. s[i], s[j] = s[j], s[i]
  69. }
  70. func (s IntervalSlice) Sort() {
  71. sort.Sort(s)
  72. }
  73. func (s IntervalSlice) Normalize() IntervalSlice {
  74. var n IntervalSlice
  75. if len(s) == 0 {
  76. return n
  77. }
  78. s.Sort()
  79. n = append(n, s[0])
  80. for i := 1; i < len(s); i++ {
  81. last := n[len(n)-1]
  82. if s[i].Start > last.Stop {
  83. n = append(n, s[i])
  84. continue
  85. } else {
  86. stop := s[i].Stop
  87. if last.Stop > stop {
  88. stop = last.Stop
  89. }
  90. n[len(n)-1] = Interval{last.Start, stop}
  91. }
  92. }
  93. return n
  94. }
  95. // Contain returns true if sub in s
  96. func (s IntervalSlice) Contain(sub IntervalSlice) bool {
  97. j := 0
  98. for i := 0; i < len(sub); i++ {
  99. for ; j < len(s); j++ {
  100. if sub[i].Start > s[j].Stop {
  101. continue
  102. } else {
  103. break
  104. }
  105. }
  106. if j == len(s) {
  107. return false
  108. }
  109. if sub[i].Start < s[j].Start || sub[i].Stop > s[j].Stop {
  110. return false
  111. }
  112. }
  113. return true
  114. }
  115. func (s IntervalSlice) Equal(o IntervalSlice) bool {
  116. if len(s) != len(o) {
  117. return false
  118. }
  119. for i := 0; i < len(s); i++ {
  120. if s[i].Start != o[i].Start || s[i].Stop != o[i].Stop {
  121. return false
  122. }
  123. }
  124. return true
  125. }
  126. func (s IntervalSlice) Compare(o IntervalSlice) int {
  127. if s.Equal(o) {
  128. return 0
  129. } else if s.Contain(o) {
  130. return 1
  131. } else {
  132. return -1
  133. }
  134. }
  135. // Refer http://dev.mysql.com/doc/refman/5.6/en/replication-gtids-concepts.html
  136. type UUIDSet struct {
  137. SID uuid.UUID
  138. Intervals IntervalSlice
  139. }
  140. func ParseUUIDSet(str string) (*UUIDSet, error) {
  141. str = strings.TrimSpace(str)
  142. sep := strings.Split(str, ":")
  143. if len(sep) < 2 {
  144. return nil, errors.Errorf("invalid GTID format, must UUID:interval[:interval]")
  145. }
  146. var err error
  147. s := new(UUIDSet)
  148. if s.SID, err = uuid.Parse(sep[0]); err != nil {
  149. return nil, errors.Trace(err)
  150. }
  151. // Handle interval
  152. for i := 1; i < len(sep); i++ {
  153. if in, err := parseInterval(sep[i]); err != nil {
  154. return nil, errors.Trace(err)
  155. } else {
  156. s.Intervals = append(s.Intervals, in)
  157. }
  158. }
  159. s.Intervals = s.Intervals.Normalize()
  160. return s, nil
  161. }
  162. func NewUUIDSet(sid uuid.UUID, in ...Interval) *UUIDSet {
  163. s := new(UUIDSet)
  164. s.SID = sid
  165. s.Intervals = in
  166. s.Intervals = s.Intervals.Normalize()
  167. return s
  168. }
  169. func (s *UUIDSet) Contain(sub *UUIDSet) bool {
  170. if s.SID != sub.SID {
  171. return false
  172. }
  173. return s.Intervals.Contain(sub.Intervals)
  174. }
  175. func (s *UUIDSet) Bytes() []byte {
  176. var buf bytes.Buffer
  177. buf.WriteString(s.SID.String())
  178. for _, i := range s.Intervals {
  179. buf.WriteString(":")
  180. buf.WriteString(i.String())
  181. }
  182. return buf.Bytes()
  183. }
  184. func (s *UUIDSet) AddInterval(in IntervalSlice) {
  185. s.Intervals = append(s.Intervals, in...)
  186. s.Intervals = s.Intervals.Normalize()
  187. }
  188. func (s *UUIDSet) MinusInterval(in IntervalSlice) {
  189. var n IntervalSlice
  190. in = in.Normalize()
  191. i, j := 0, 0
  192. var minuend Interval
  193. var subtrahend Interval
  194. for i < len(s.Intervals) {
  195. if minuend.Stop != s.Intervals[i].Stop { // `i` changed?
  196. minuend = s.Intervals[i]
  197. }
  198. if j < len(in) {
  199. subtrahend = in[j]
  200. } else {
  201. subtrahend = Interval{math.MaxInt64, math.MaxInt64}
  202. }
  203. if minuend.Stop <= subtrahend.Start {
  204. // no overlapping
  205. n = append(n, minuend)
  206. i++
  207. } else if minuend.Start >= subtrahend.Stop {
  208. // no overlapping
  209. j++
  210. } else {
  211. if minuend.Start < subtrahend.Start && minuend.Stop <= subtrahend.Stop {
  212. n = append(n, Interval{minuend.Start, subtrahend.Start})
  213. i++
  214. } else if minuend.Start >= subtrahend.Start && minuend.Stop > subtrahend.Stop {
  215. minuend = Interval{subtrahend.Stop, minuend.Stop}
  216. j++
  217. } else if minuend.Start >= subtrahend.Start && minuend.Stop <= subtrahend.Stop {
  218. // minuend is completely removed
  219. i++
  220. } else if minuend.Start < subtrahend.Start && minuend.Stop > subtrahend.Stop {
  221. n = append(n, Interval{minuend.Start, subtrahend.Start})
  222. minuend = Interval{subtrahend.Stop, minuend.Stop}
  223. j++
  224. } else {
  225. panic("should never be here")
  226. }
  227. }
  228. }
  229. s.Intervals = n.Normalize()
  230. }
  231. func (s *UUIDSet) String() string {
  232. return hack.String(s.Bytes())
  233. }
  234. func (s *UUIDSet) encode(w io.Writer) {
  235. b, _ := s.SID.MarshalBinary()
  236. _, _ = w.Write(b)
  237. n := int64(len(s.Intervals))
  238. _ = binary.Write(w, binary.LittleEndian, n)
  239. for _, i := range s.Intervals {
  240. _ = binary.Write(w, binary.LittleEndian, i.Start)
  241. _ = binary.Write(w, binary.LittleEndian, i.Stop)
  242. }
  243. }
  244. func (s *UUIDSet) Encode() []byte {
  245. var buf bytes.Buffer
  246. s.encode(&buf)
  247. return buf.Bytes()
  248. }
  249. func (s *UUIDSet) decode(data []byte) (int, error) {
  250. if len(data) < 24 {
  251. return 0, errors.Errorf("invalid uuid set buffer, less 24")
  252. }
  253. pos := 0
  254. var err error
  255. if s.SID, err = uuid.FromBytes(data[0:16]); err != nil {
  256. return 0, err
  257. }
  258. pos += 16
  259. n := int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
  260. pos += 8
  261. if len(data) < int(16*n)+pos {
  262. return 0, errors.Errorf("invalid uuid set buffer, must %d, but %d", pos+int(16*n), len(data))
  263. }
  264. s.Intervals = make([]Interval, 0, n)
  265. var in Interval
  266. for i := int64(0); i < n; i++ {
  267. in.Start = int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
  268. pos += 8
  269. in.Stop = int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
  270. pos += 8
  271. s.Intervals = append(s.Intervals, in)
  272. }
  273. return pos, nil
  274. }
  275. func (s *UUIDSet) Decode(data []byte) error {
  276. n, err := s.decode(data)
  277. if n != len(data) {
  278. return errors.Errorf("invalid uuid set buffer, must %d, but %d", n, len(data))
  279. }
  280. return err
  281. }
  282. func (s *UUIDSet) Clone() *UUIDSet {
  283. clone := new(UUIDSet)
  284. copy(clone.SID[:], s.SID[:])
  285. clone.Intervals = s.Intervals.Normalize()
  286. return clone
  287. }
  288. type MysqlGTIDSet struct {
  289. Sets map[string]*UUIDSet
  290. }
  291. var _ GTIDSet = &MysqlGTIDSet{}
  292. func ParseMysqlGTIDSet(str string) (GTIDSet, error) {
  293. s := new(MysqlGTIDSet)
  294. s.Sets = make(map[string]*UUIDSet)
  295. if str == "" {
  296. return s, nil
  297. }
  298. sp := strings.Split(str, ",")
  299. //todo, handle redundant same uuid
  300. for i := 0; i < len(sp); i++ {
  301. if set, err := ParseUUIDSet(sp[i]); err != nil {
  302. return nil, errors.Trace(err)
  303. } else {
  304. s.AddSet(set)
  305. }
  306. }
  307. return s, nil
  308. }
  309. func DecodeMysqlGTIDSet(data []byte) (*MysqlGTIDSet, error) {
  310. s := new(MysqlGTIDSet)
  311. if len(data) < 8 {
  312. return nil, errors.Errorf("invalid gtid set buffer, less 4")
  313. }
  314. n := int(binary.LittleEndian.Uint64(data))
  315. s.Sets = make(map[string]*UUIDSet, n)
  316. pos := 8
  317. for i := 0; i < n; i++ {
  318. set := new(UUIDSet)
  319. if n, err := set.decode(data[pos:]); err != nil {
  320. return nil, errors.Trace(err)
  321. } else {
  322. pos += n
  323. s.AddSet(set)
  324. }
  325. }
  326. return s, nil
  327. }
  328. func (s *MysqlGTIDSet) AddSet(set *UUIDSet) {
  329. if set == nil {
  330. return
  331. }
  332. sid := set.SID.String()
  333. o, ok := s.Sets[sid]
  334. if ok {
  335. o.AddInterval(set.Intervals)
  336. } else {
  337. s.Sets[sid] = set
  338. }
  339. }
  340. func (s *MysqlGTIDSet) MinusSet(set *UUIDSet) {
  341. if set == nil {
  342. return
  343. }
  344. sid := set.SID.String()
  345. uuidSet, ok := s.Sets[sid]
  346. if ok {
  347. uuidSet.MinusInterval(set.Intervals)
  348. if uuidSet.Intervals == nil {
  349. delete(s.Sets, sid)
  350. }
  351. }
  352. }
  353. func (s *MysqlGTIDSet) Update(GTIDStr string) error {
  354. gtidSet, err := ParseMysqlGTIDSet(GTIDStr)
  355. if err != nil {
  356. return err
  357. }
  358. for _, uuidSet := range gtidSet.(*MysqlGTIDSet).Sets {
  359. s.AddSet(uuidSet)
  360. }
  361. return nil
  362. }
  363. func (s *MysqlGTIDSet) Add(addend MysqlGTIDSet) error {
  364. for _, uuidSet := range addend.Sets {
  365. s.AddSet(uuidSet)
  366. }
  367. return nil
  368. }
  369. func (s *MysqlGTIDSet) Minus(subtrahend MysqlGTIDSet) error {
  370. for _, uuidSet := range subtrahend.Sets {
  371. s.MinusSet(uuidSet)
  372. }
  373. return nil
  374. }
  375. func (s *MysqlGTIDSet) Contain(o GTIDSet) bool {
  376. sub, ok := o.(*MysqlGTIDSet)
  377. if !ok {
  378. return false
  379. }
  380. for key, set := range sub.Sets {
  381. o, ok := s.Sets[key]
  382. if !ok {
  383. return false
  384. }
  385. if !o.Contain(set) {
  386. return false
  387. }
  388. }
  389. return true
  390. }
  391. func (s *MysqlGTIDSet) Equal(o GTIDSet) bool {
  392. sub, ok := o.(*MysqlGTIDSet)
  393. if !ok {
  394. return false
  395. }
  396. if len(sub.Sets) != len(s.Sets) {
  397. return false
  398. }
  399. for key, set := range sub.Sets {
  400. o, ok := s.Sets[key]
  401. if !ok {
  402. return false
  403. }
  404. if !o.Intervals.Equal(set.Intervals) {
  405. return false
  406. }
  407. }
  408. return true
  409. }
  410. func (s *MysqlGTIDSet) String() string {
  411. // there is only one element in gtid set
  412. if len(s.Sets) == 1 {
  413. for _, set := range s.Sets {
  414. return set.String()
  415. }
  416. }
  417. // sort multi set
  418. var buf bytes.Buffer
  419. sets := make([]string, 0, len(s.Sets))
  420. for _, set := range s.Sets {
  421. sets = append(sets, set.String())
  422. }
  423. sort.Strings(sets)
  424. sep := ""
  425. for _, set := range sets {
  426. buf.WriteString(sep)
  427. buf.WriteString(set)
  428. sep = ","
  429. }
  430. return hack.String(buf.Bytes())
  431. }
  432. func (s *MysqlGTIDSet) Encode() []byte {
  433. var buf bytes.Buffer
  434. _ = binary.Write(&buf, binary.LittleEndian, uint64(len(s.Sets)))
  435. for i := range s.Sets {
  436. s.Sets[i].encode(&buf)
  437. }
  438. return buf.Bytes()
  439. }
  440. func (gtid *MysqlGTIDSet) Clone() GTIDSet {
  441. clone := &MysqlGTIDSet{
  442. Sets: make(map[string]*UUIDSet),
  443. }
  444. for sid, uuidSet := range gtid.Sets {
  445. clone.Sets[sid] = uuidSet.Clone()
  446. }
  447. return clone
  448. }