123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- package dump
- import (
- "bytes"
- "fmt"
- "io"
- "os"
- "os/exec"
- "strings"
- . "github.com/go-mysql-org/go-mysql/mysql"
- "github.com/pingcap/errors"
- "github.com/siddontang/go-log/log"
- )
- // Unlick mysqldump, Dumper is designed for parsing and syning data easily.
- type Dumper struct {
- // mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
- ExecutionPath string
- Addr string
- User string
- Password string
- Protocol string
- // Will override Databases
- Tables []string
- TableDB string
- Databases []string
- Where string
- Charset string
- IgnoreTables map[string][]string
- ExtraOptions []string
- ErrOut io.Writer
- masterDataSkipped bool
- maxAllowedPacket int
- hexBlob bool
- // see detectColumnStatisticsParamSupported
- isColumnStatisticsParamSupported bool
- }
- func NewDumper(executionPath string, addr string, user string, password string) (*Dumper, error) {
- if len(executionPath) == 0 {
- return nil, nil
- }
- path, err := exec.LookPath(executionPath)
- if err != nil {
- return nil, errors.Trace(err)
- }
- d := new(Dumper)
- d.ExecutionPath = path
- d.Addr = addr
- d.User = user
- d.Password = password
- d.Tables = make([]string, 0, 16)
- d.Databases = make([]string, 0, 16)
- d.Charset = DEFAULT_CHARSET
- d.IgnoreTables = make(map[string][]string)
- d.ExtraOptions = make([]string, 0, 5)
- d.masterDataSkipped = false
- d.isColumnStatisticsParamSupported = d.detectColumnStatisticsParamSupported()
- d.ErrOut = os.Stderr
- return d, nil
- }
- // New mysqldump versions try to send queries to information_schema.COLUMN_STATISTICS table which does not exist in old MySQL (<5.x).
- // And we got error: "Unknown table 'COLUMN_STATISTICS' in information_schema (1109)".
- //
- // mysqldump may not send this query if it is started with parameter --column-statistics.
- // But this parameter exists only for versions >=8.0.2 (https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-2.html).
- //
- // For environments where the version of mysql-server and mysqldump differs, we try to check this parameter and use it if available.
- func (d *Dumper) detectColumnStatisticsParamSupported() bool {
- out, err := exec.Command(d.ExecutionPath, `--help`).CombinedOutput()
- if err != nil {
- return false
- }
- return bytes.Contains(out, []byte(`--column-statistics`))
- }
- func (d *Dumper) SetCharset(charset string) {
- d.Charset = charset
- }
- func (d *Dumper) SetProtocol(protocol string) {
- d.Protocol = protocol
- }
- func (d *Dumper) SetWhere(where string) {
- d.Where = where
- }
- func (d *Dumper) SetExtraOptions(options []string) {
- d.ExtraOptions = options
- }
- func (d *Dumper) SetErrOut(o io.Writer) {
- d.ErrOut = o
- }
- // SkipMasterData: In some cloud MySQL, we have no privilege to use `--master-data`.
- func (d *Dumper) SkipMasterData(v bool) {
- d.masterDataSkipped = v
- }
- func (d *Dumper) SetMaxAllowedPacket(i int) {
- d.maxAllowedPacket = i
- }
- func (d *Dumper) SetHexBlob(v bool) {
- d.hexBlob = v
- }
- func (d *Dumper) AddDatabases(dbs ...string) {
- d.Databases = append(d.Databases, dbs...)
- }
- func (d *Dumper) AddTables(db string, tables ...string) {
- if d.TableDB != db {
- d.TableDB = db
- d.Tables = d.Tables[0:0]
- }
- d.Tables = append(d.Tables, tables...)
- }
- func (d *Dumper) AddIgnoreTables(db string, tables ...string) {
- t := d.IgnoreTables[db]
- t = append(t, tables...)
- d.IgnoreTables[db] = t
- }
- func (d *Dumper) Reset() {
- d.Tables = d.Tables[0:0]
- d.TableDB = ""
- d.IgnoreTables = make(map[string][]string)
- d.Databases = d.Databases[0:0]
- d.Where = ""
- }
- func (d *Dumper) Dump(w io.Writer) error {
- args := make([]string, 0, 16)
- // Common args
- if strings.Contains(d.Addr, "/") {
- args = append(args, fmt.Sprintf("--socket=%s", d.Addr))
- } else {
- seps := strings.SplitN(d.Addr, ":", 2)
- args = append(args, fmt.Sprintf("--host=%s", seps[0]))
- if len(seps) > 1 {
- args = append(args, fmt.Sprintf("--port=%s", seps[1]))
- }
- }
- args = append(args, fmt.Sprintf("--user=%s", d.User))
- passwordArg := fmt.Sprintf("--password=%s", d.Password)
- args = append(args, passwordArg)
- passwordArgIndex := len(args) - 1
- if !d.masterDataSkipped {
- args = append(args, "--master-data")
- }
- if d.maxAllowedPacket > 0 {
- // mysqldump param should be --max-allowed-packet=%dM not be --max_allowed_packet=%dM
- args = append(args, fmt.Sprintf("--max-allowed-packet=%dM", d.maxAllowedPacket))
- }
- if d.Protocol != "" {
- args = append(args, fmt.Sprintf("--protocol=%s", d.Protocol))
- }
- args = append(args, "--single-transaction")
- args = append(args, "--skip-lock-tables")
- // Disable uncessary data
- args = append(args, "--compact")
- args = append(args, "--skip-opt")
- args = append(args, "--quick")
- // We only care about data
- args = append(args, "--no-create-info")
- // Multi row is easy for us to parse the data
- args = append(args, "--skip-extended-insert")
- args = append(args, "--skip-tz-utc")
- if d.hexBlob {
- // Use hex for the binary type
- args = append(args, "--hex-blob")
- }
- for db, tables := range d.IgnoreTables {
- for _, table := range tables {
- args = append(args, fmt.Sprintf("--ignore-table=%s.%s", db, table))
- }
- }
- if len(d.Charset) != 0 {
- args = append(args, fmt.Sprintf("--default-character-set=%s", d.Charset))
- }
- if len(d.Where) != 0 {
- args = append(args, fmt.Sprintf("--where=%s", d.Where))
- }
- if len(d.ExtraOptions) != 0 {
- args = append(args, d.ExtraOptions...)
- }
- if d.isColumnStatisticsParamSupported {
- args = append(args, `--column-statistics=0`)
- }
- if len(d.Tables) == 0 && len(d.Databases) == 0 {
- args = append(args, "--all-databases")
- } else if len(d.Tables) == 0 {
- args = append(args, "--databases")
- args = append(args, d.Databases...)
- } else {
- args = append(args, d.TableDB)
- args = append(args, d.Tables...)
- // If we only dump some tables, the dump data will not have database name
- // which makes us hard to parse, so here we add it manually.
- _, err := w.Write([]byte(fmt.Sprintf("USE `%s`;\n", d.TableDB)))
- if err != nil {
- return fmt.Errorf(`could not write USE command: %w`, err)
- }
- }
- args[passwordArgIndex] = "--password=******"
- log.Infof("exec mysqldump with %v", args)
- args[passwordArgIndex] = passwordArg
- cmd := exec.Command(d.ExecutionPath, args...)
- cmd.Stderr = d.ErrOut
- cmd.Stdout = w
- return cmd.Run()
- }
- // DumpAndParse: Dump MySQL and parse immediately
- func (d *Dumper) DumpAndParse(h ParseHandler) error {
- r, w := io.Pipe()
- done := make(chan error, 1)
- go func() {
- err := Parse(r, h, !d.masterDataSkipped)
- _ = r.CloseWithError(err)
- done <- err
- }()
- err := d.Dump(w)
- _ = w.CloseWithError(err)
- err = <-done
- return errors.Trace(err)
- }
|