backup.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package replication
  2. import (
  3. "context"
  4. "io"
  5. "os"
  6. "path"
  7. "time"
  8. . "github.com/go-mysql-org/go-mysql/mysql"
  9. "github.com/pingcap/errors"
  10. )
  11. // StartBackup: Like mysqlbinlog remote raw backup
  12. // Backup remote binlog from position (filename, offset) and write in backupDir
  13. func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error {
  14. if timeout == 0 {
  15. // a very long timeout here
  16. timeout = 30 * 3600 * 24 * time.Second
  17. }
  18. // Force use raw mode
  19. b.parser.SetRawMode(true)
  20. if err := os.MkdirAll(backupDir, 0755); err != nil {
  21. return errors.Trace(err)
  22. }
  23. s, err := b.StartSync(p)
  24. if err != nil {
  25. return errors.Trace(err)
  26. }
  27. var filename string
  28. var offset uint32
  29. var f *os.File
  30. defer func() {
  31. if f != nil {
  32. f.Close()
  33. }
  34. }()
  35. for {
  36. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  37. e, err := s.GetEvent(ctx)
  38. cancel()
  39. if err == context.DeadlineExceeded {
  40. return nil
  41. }
  42. if err != nil {
  43. return errors.Trace(err)
  44. }
  45. offset = e.Header.LogPos
  46. if e.Header.EventType == ROTATE_EVENT {
  47. rotateEvent := e.Event.(*RotateEvent)
  48. filename = string(rotateEvent.NextLogName)
  49. if e.Header.Timestamp == 0 || offset == 0 {
  50. // fake rotate event
  51. continue
  52. }
  53. } else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
  54. // FormateDescriptionEvent is the first event in binlog, we will close old one and create a new
  55. if f != nil {
  56. f.Close()
  57. }
  58. if len(filename) == 0 {
  59. return errors.Errorf("empty binlog filename for FormateDescriptionEvent")
  60. }
  61. f, err = os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
  62. if err != nil {
  63. return errors.Trace(err)
  64. }
  65. // write binlog header fe'bin'
  66. if _, err = f.Write(BinLogFileHeader); err != nil {
  67. return errors.Trace(err)
  68. }
  69. }
  70. if n, err := f.Write(e.RawData); err != nil {
  71. return errors.Trace(err)
  72. } else if n != len(e.RawData) {
  73. return errors.Trace(io.ErrShortWrite)
  74. }
  75. }
  76. }