dumper.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package dump
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "os"
  7. "os/exec"
  8. "strings"
  9. . "github.com/go-mysql-org/go-mysql/mysql"
  10. "github.com/pingcap/errors"
  11. "github.com/siddontang/go-log/log"
  12. )
  13. // Unlick mysqldump, Dumper is designed for parsing and syning data easily.
  14. type Dumper struct {
  15. // mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
  16. ExecutionPath string
  17. Addr string
  18. User string
  19. Password string
  20. Protocol string
  21. // Will override Databases
  22. Tables []string
  23. TableDB string
  24. Databases []string
  25. Where string
  26. Charset string
  27. IgnoreTables map[string][]string
  28. ExtraOptions []string
  29. ErrOut io.Writer
  30. masterDataSkipped bool
  31. maxAllowedPacket int
  32. hexBlob bool
  33. // see detectColumnStatisticsParamSupported
  34. isColumnStatisticsParamSupported bool
  35. }
  36. func NewDumper(executionPath string, addr string, user string, password string) (*Dumper, error) {
  37. if len(executionPath) == 0 {
  38. return nil, nil
  39. }
  40. path, err := exec.LookPath(executionPath)
  41. if err != nil {
  42. return nil, errors.Trace(err)
  43. }
  44. d := new(Dumper)
  45. d.ExecutionPath = path
  46. d.Addr = addr
  47. d.User = user
  48. d.Password = password
  49. d.Tables = make([]string, 0, 16)
  50. d.Databases = make([]string, 0, 16)
  51. d.Charset = DEFAULT_CHARSET
  52. d.IgnoreTables = make(map[string][]string)
  53. d.ExtraOptions = make([]string, 0, 5)
  54. d.masterDataSkipped = false
  55. d.isColumnStatisticsParamSupported = d.detectColumnStatisticsParamSupported()
  56. d.ErrOut = os.Stderr
  57. return d, nil
  58. }
  59. // New mysqldump versions try to send queries to information_schema.COLUMN_STATISTICS table which does not exist in old MySQL (<5.x).
  60. // And we got error: "Unknown table 'COLUMN_STATISTICS' in information_schema (1109)".
  61. //
  62. // mysqldump may not send this query if it is started with parameter --column-statistics.
  63. // But this parameter exists only for versions >=8.0.2 (https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-2.html).
  64. //
  65. // For environments where the version of mysql-server and mysqldump differs, we try to check this parameter and use it if available.
  66. func (d *Dumper) detectColumnStatisticsParamSupported() bool {
  67. out, err := exec.Command(d.ExecutionPath, `--help`).CombinedOutput()
  68. if err != nil {
  69. return false
  70. }
  71. return bytes.Contains(out, []byte(`--column-statistics`))
  72. }
  73. func (d *Dumper) SetCharset(charset string) {
  74. d.Charset = charset
  75. }
  76. func (d *Dumper) SetProtocol(protocol string) {
  77. d.Protocol = protocol
  78. }
  79. func (d *Dumper) SetWhere(where string) {
  80. d.Where = where
  81. }
  82. func (d *Dumper) SetExtraOptions(options []string) {
  83. d.ExtraOptions = options
  84. }
  85. func (d *Dumper) SetErrOut(o io.Writer) {
  86. d.ErrOut = o
  87. }
  88. // SkipMasterData: In some cloud MySQL, we have no privilege to use `--master-data`.
  89. func (d *Dumper) SkipMasterData(v bool) {
  90. d.masterDataSkipped = v
  91. }
  92. func (d *Dumper) SetMaxAllowedPacket(i int) {
  93. d.maxAllowedPacket = i
  94. }
  95. func (d *Dumper) SetHexBlob(v bool) {
  96. d.hexBlob = v
  97. }
  98. func (d *Dumper) AddDatabases(dbs ...string) {
  99. d.Databases = append(d.Databases, dbs...)
  100. }
  101. func (d *Dumper) AddTables(db string, tables ...string) {
  102. if d.TableDB != db {
  103. d.TableDB = db
  104. d.Tables = d.Tables[0:0]
  105. }
  106. d.Tables = append(d.Tables, tables...)
  107. }
  108. func (d *Dumper) AddIgnoreTables(db string, tables ...string) {
  109. t := d.IgnoreTables[db]
  110. t = append(t, tables...)
  111. d.IgnoreTables[db] = t
  112. }
  113. func (d *Dumper) Reset() {
  114. d.Tables = d.Tables[0:0]
  115. d.TableDB = ""
  116. d.IgnoreTables = make(map[string][]string)
  117. d.Databases = d.Databases[0:0]
  118. d.Where = ""
  119. }
  120. func (d *Dumper) Dump(w io.Writer) error {
  121. args := make([]string, 0, 16)
  122. // Common args
  123. if strings.Contains(d.Addr, "/") {
  124. args = append(args, fmt.Sprintf("--socket=%s", d.Addr))
  125. } else {
  126. seps := strings.SplitN(d.Addr, ":", 2)
  127. args = append(args, fmt.Sprintf("--host=%s", seps[0]))
  128. if len(seps) > 1 {
  129. args = append(args, fmt.Sprintf("--port=%s", seps[1]))
  130. }
  131. }
  132. args = append(args, fmt.Sprintf("--user=%s", d.User))
  133. passwordArg := fmt.Sprintf("--password=%s", d.Password)
  134. args = append(args, passwordArg)
  135. passwordArgIndex := len(args) - 1
  136. if !d.masterDataSkipped {
  137. args = append(args, "--master-data")
  138. }
  139. if d.maxAllowedPacket > 0 {
  140. // mysqldump param should be --max-allowed-packet=%dM not be --max_allowed_packet=%dM
  141. args = append(args, fmt.Sprintf("--max-allowed-packet=%dM", d.maxAllowedPacket))
  142. }
  143. if d.Protocol != "" {
  144. args = append(args, fmt.Sprintf("--protocol=%s", d.Protocol))
  145. }
  146. args = append(args, "--single-transaction")
  147. args = append(args, "--skip-lock-tables")
  148. // Disable uncessary data
  149. args = append(args, "--compact")
  150. args = append(args, "--skip-opt")
  151. args = append(args, "--quick")
  152. // We only care about data
  153. args = append(args, "--no-create-info")
  154. // Multi row is easy for us to parse the data
  155. args = append(args, "--skip-extended-insert")
  156. args = append(args, "--skip-tz-utc")
  157. if d.hexBlob {
  158. // Use hex for the binary type
  159. args = append(args, "--hex-blob")
  160. }
  161. for db, tables := range d.IgnoreTables {
  162. for _, table := range tables {
  163. args = append(args, fmt.Sprintf("--ignore-table=%s.%s", db, table))
  164. }
  165. }
  166. if len(d.Charset) != 0 {
  167. args = append(args, fmt.Sprintf("--default-character-set=%s", d.Charset))
  168. }
  169. if len(d.Where) != 0 {
  170. args = append(args, fmt.Sprintf("--where=%s", d.Where))
  171. }
  172. if len(d.ExtraOptions) != 0 {
  173. args = append(args, d.ExtraOptions...)
  174. }
  175. if d.isColumnStatisticsParamSupported {
  176. args = append(args, `--column-statistics=0`)
  177. }
  178. if len(d.Tables) == 0 && len(d.Databases) == 0 {
  179. args = append(args, "--all-databases")
  180. } else if len(d.Tables) == 0 {
  181. args = append(args, "--databases")
  182. args = append(args, d.Databases...)
  183. } else {
  184. args = append(args, d.TableDB)
  185. args = append(args, d.Tables...)
  186. // If we only dump some tables, the dump data will not have database name
  187. // which makes us hard to parse, so here we add it manually.
  188. _, err := w.Write([]byte(fmt.Sprintf("USE `%s`;\n", d.TableDB)))
  189. if err != nil {
  190. return fmt.Errorf(`could not write USE command: %w`, err)
  191. }
  192. }
  193. args[passwordArgIndex] = "--password=******"
  194. log.Infof("exec mysqldump with %v", args)
  195. args[passwordArgIndex] = passwordArg
  196. cmd := exec.Command(d.ExecutionPath, args...)
  197. cmd.Stderr = d.ErrOut
  198. cmd.Stdout = w
  199. return cmd.Run()
  200. }
  201. // DumpAndParse: Dump MySQL and parse immediately
  202. func (d *Dumper) DumpAndParse(h ParseHandler) error {
  203. r, w := io.Pipe()
  204. done := make(chan error, 1)
  205. go func() {
  206. err := Parse(r, h, !d.masterDataSkipped)
  207. _ = r.CloseWithError(err)
  208. done <- err
  209. }()
  210. err := d.Dump(w)
  211. _ = w.CloseWithError(err)
  212. err = <-done
  213. return errors.Trace(err)
  214. }