binlogstreamer.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package replication
  2. import (
  3. "context"
  4. "time"
  5. "github.com/pingcap/errors"
  6. "github.com/siddontang/go-log/log"
  7. )
  8. var (
  9. ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again")
  10. ErrSyncClosed = errors.New("Sync was closed")
  11. )
  12. // BinlogStreamer gets the streaming event.
  13. type BinlogStreamer struct {
  14. ch chan *BinlogEvent
  15. ech chan error
  16. err error
  17. }
  18. // GetEvent gets the binlog event one by one, it will block until Syncer receives any events from MySQL
  19. // or meets a sync error. You can pass a context (like Cancel or Timeout) to break the block.
  20. func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error) {
  21. if s.err != nil {
  22. return nil, ErrNeedSyncAgain
  23. }
  24. select {
  25. case c := <-s.ch:
  26. return c, nil
  27. case s.err = <-s.ech:
  28. return nil, s.err
  29. case <-ctx.Done():
  30. return nil, ctx.Err()
  31. }
  32. }
  33. // GetEventWithStartTime gets the binlog event with starttime, if current binlog event timestamp smaller than specify starttime
  34. // return nil event
  35. func (s *BinlogStreamer) GetEventWithStartTime(ctx context.Context, startTime time.Time) (*BinlogEvent, error) {
  36. if s.err != nil {
  37. return nil, ErrNeedSyncAgain
  38. }
  39. startUnix := startTime.Unix()
  40. select {
  41. case c := <-s.ch:
  42. if int64(c.Header.Timestamp) >= startUnix {
  43. return c, nil
  44. }
  45. return nil, nil
  46. case s.err = <-s.ech:
  47. return nil, s.err
  48. case <-ctx.Done():
  49. return nil, ctx.Err()
  50. }
  51. }
  52. // DumpEvents dumps all left events
  53. func (s *BinlogStreamer) DumpEvents() []*BinlogEvent {
  54. count := len(s.ch)
  55. events := make([]*BinlogEvent, 0, count)
  56. for i := 0; i < count; i++ {
  57. events = append(events, <-s.ch)
  58. }
  59. return events
  60. }
  61. func (s *BinlogStreamer) close() {
  62. s.closeWithError(nil)
  63. }
  64. func (s *BinlogStreamer) closeWithError(err error) {
  65. if err == nil {
  66. err = ErrSyncClosed
  67. } else {
  68. log.Errorf("close sync with err: %v", err)
  69. }
  70. select {
  71. case s.ech <- err:
  72. default:
  73. }
  74. }
  75. func newBinlogStreamer() *BinlogStreamer {
  76. s := new(BinlogStreamer)
  77. s.ch = make(chan *BinlogEvent, 10240)
  78. s.ech = make(chan error, 4)
  79. return s
  80. }