package kpt import ( "bytes" "context" "database/sql" "encoding/binary" "errors" "fmt" "github.com/Unknwon/goconfig" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" _ "github.com/go-sql-driver/mysql" //导入mysql驱动包 "github.com/recoilme/slowpoke" "github.com/shopspring/decimal" "github.com/siddontang/go-log/log" "os" "os/exec" "path/filepath" "reflect" "runtime" "strconv" "strings" "time" ) var ( host string //"MySQL host") port int //"MySQL port") user string // "MySQL user, must have replication privilege") password string // "MySQL password") flavor string // "Flavor: mysql or mariadb") serverID int // "Unique Server ID") mysqldump string // "mysqldump execution path") tables string // "dump tables, seperated by comma, will overwrite dbs") tableDB string // "database for dump tables") ignoreTables string // "ignore tables, must be database.table format, separated by comma") startName string // "start sync from binlog name") startPos uint // "start sync from binlog position of") endName string // "start sync from binlog name") endPos uint32 // "start sync from binlog position of") heartbeatPeriod int // "master heartbeat period") readTimeout int // "connection read timeout") nodename string //nodename key name nodevalue string //nodevalue value showlog int ServiceName string //服务显示名称 ServiceDisplayName string //服务名称 ServiceDescription string //服务描述 KafkaEnable bool MqttEnable bool CurrentPath string mydb *sql.DB // 全局的数据库操作句柄 posdb *sql.DB // 全局的数据库操作句柄 inicfg *goconfig.ConfigFile PosChan chan Posname stmt *sql.Stmt deletestmt *sql.Stmt logger *log.Logger Ch chan int insertstmt string = "" insertcount int = 0 insertcountv int = 0 Exectx *sql.Tx insertlog_lasttime time.Time DqueueIndex *DQueue DqueueIndexMqtt *DQueue mqttDb *sql.DB KptCattleId string ) const ( Type = "mysql" Insert = `INSERT INTO %s(%s) VALUES%s;` Update = `UPDATE %s SET %s WHERE %s;` Delete = `DELETE FROM %s WHERE %s;` DeleteAll = `TRUNCATE TABLE %s` ) type Posname struct { name string pos uint32 } func ReadWithSelect(sqlch chan string) (x string, res bool) { select { case x = <-sqlch: return x, true default: return "", false } } func WriteChWithSelect(sqlch chan string, sql string) bool { timeout := time.NewTimer(time.Microsecond * 500) select { case sqlch <- sql: return true case <-timeout.C: { logger.Errorf("WriteChWithSelect 超时: %s", sql) fmt.Printf("WriteChWithSelect 超时: %s", sql) return false } } } func GetCurrentPath() (string, error) { file, err := exec.LookPath(os.Args[0]) if err != nil { return "", err } path, err := filepath.Abs(file) if err != nil { return "", err } //fmt.Println("path111:", path) if runtime.GOOS == "windows" { path = strings.Replace(path, "\\", "/", -1) } //fmt.Println("path222:", path) i := strings.LastIndex(path, "/") if i < 0 { return "", errors.New(`Can't find "/" or "\".`) } //fmt.Println("path333:", path) return string(path[0 : i+1]), nil } func GetColumns(e *canal.RowsEvent) []string { var Columns []string for _, value := range e.Table.Columns { Columns = append(Columns, "`"+value.Name+"`") } if strings.Trim(nodename, " ") != "" { Columns = append(Columns, strings.Trim(nodename, " ")) } return Columns } func initDB() { err := GetDbsConnect(user, password, host, port, "mysql") _, err = mydb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`") mydb.Close() err = GetDbsConnect(user, password, host, port, "sqllog") if err == nil { _, _ = mydb.Exec("CREATE TABLE `tablesqllog` (`id` int(11) NOT NULL AUTO_INCREMENT,`sql` text DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;") insertEmpty() } } func GetDbsConnect(_user, _password, _host string, _port int, _dbs string) error { connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs) var err error mydb, err = sql.Open("mysql", connecting) if err == nil { mydb.SetMaxOpenConns(10) mydb.SetMaxIdleConns(10) } else { return err } return nil } func GetmyDbsConnect(_user, _password, _host string, _port int, _dbs string) (*sql.DB, error) { connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs) mdb, err := sql.Open("mysql", connecting) if err == nil { mdb.SetMaxOpenConns(10) mdb.SetMaxIdleConns(10) } else { return nil, err } return mdb, nil } func insertEmpty() { if len(tables) > 0 && len(tableDB) > 0 { subs := strings.Split(tables, ",") for i := len(subs) - 1; i >= 0; i-- { var where []string where = append(where, " 1 = 1") if strings.Trim(nodename, " ") != "" { where = append(where, nodename+" = "+"'"+nodevalue+"'") } sqls := fmt.Sprintf(Delete, subs[i], strings.Join(where, " and ")) //insertlog(sqls) //savesqltofile(sqls) err := DqueueIndex.PushOneIndex([]byte(sqls)) if err != nil{ fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:",err) } } //for _, value := range subs { //} } } //func () { // for i:= 0 ; i<100000000;i++ { // select { // case <-c2 : // fmt.Println("c2退出") // return // default: // err =dqueuePop.PushOneIndex( // []byte("a"+strconv.Itoa(i)), // ) // if err != nil{ // fmt.Println("dqueuePop.PushOneIndex err",err) // // }else { // fmt.Println("dqueuePop.PushOneIndex sucess","a"+strconv.Itoa(i)) // } // //time.Sleep(1*time.Millisecond) // } // //err =dqueue.Push([][]byte{ // // []byte("a"+strconv.Itoa(i)), // // []byte("b"+strconv.Itoa(i)), // //}) // } //} func appendinsertsql(sqlstr string) { if insertcount == 0 { insertstmt = "insert into tablesqllog(`sql`) values(\"" + sqlstr + "\")" } else { insertstmt = insertstmt + ",(\"" + sqlstr + "\")" } insertcount++ } func insertlog(sqlstr string) { var err error if mydb == nil { connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", user, password, host, port, "sqllog") mydb, err = sql.Open("mysql", connecting) if err != nil { fmt.Println(" sql.Open err", err) } mydb.SetMaxOpenConns(100) mydb.SetMaxIdleConns(10) mydb.Ping() } if Exectx == nil { Exectx, err = mydb.Begin() if err != nil { fmt.Println(err) } } if Exectx != nil { _, err = Exectx.Exec("insert into tablesqllog(`sql`) values(?);", sqlstr) if err != nil { fmt.Println("Exectx.Exec(", err) } else { insertcount++ if time.Since(insertlog_lasttime) > 60*time.Second { err := Exectx.Commit() Exectx = nil if err != nil { fmt.Println("Exectx.Commit():", err) } insertlog_lasttime = time.Now() // get current time } else { if insertcount > 10000 { err := Exectx.Commit() Exectx = nil if err != nil { fmt.Println("Exectx.Commit()insertcount:", err) } insertlog_lasttime = time.Now() // get current time insertcount = 0 } } } } } func Exec(mdb *sql.DB, _exsqlstr string) error { if mdb != nil { _, err := mdb.Exec(_exsqlstr) if err != nil { return err } } return nil } func ExecT(mdb *sql.Tx, _exsqlstr string) error { if mdb != nil { _, err := mdb.Exec(_exsqlstr) if err != nil { return err } } return nil } func Get(mdb *sql.DB) (int, string) { var id int var sql string sql = "" if mdb != nil { //参数绑定,可以避免sql注入 rows := mdb.QueryRow("select `id`,`sql` from `tablesqllog` order by id limit 1") err := rows.Scan(&id, &sql) if err != nil { return 0, "" } } return id, sql } func delete(mdb *sql.DB, id int) error { if deletestmt == nil { deletestmt, _ = mdb.Prepare("delete from `tablesqllog` where id <= ? ") } if deletestmt != nil { _, err := deletestmt.Exec(id) if err != nil { return err } } return nil } func GetDML(e *canal.RowsEvent) string { query := "" if e.Action == "insert" { Columns := GetColumns(e) var rows []string for _, value := range e.Rows { var frows []string for _, fvalue := range value { switch v := fvalue.(type) { case string: frows = append(frows, "'"+v+"'") case int8, int16, int32, int, int64: strV := fmt.Sprintf("%d", v) frows = append(frows, strV) case decimal.Decimal: frows = append(frows, v.String()) case float32: frows = append(frows, strconv.FormatFloat(float64(v), 'f', -1, 32)) case float64: frows = append(frows, strconv.FormatFloat(v, 'f', -1, 64)) default: frows = append(frows, "NULL") } } if strings.Trim(nodename, " ") != "" { frows = append(frows, "'"+nodevalue+"'") } rows = append(rows, "("+strings.Join(frows, ",")+")") } query = fmt.Sprintf(Insert, e.Table.Name, strings.Join(Columns, ","), strings.Join(rows, ",")) } else if e.Action == "update" { var rows []string var where []string var uset []string for i, value := range e.Rows { if (i % 2) == 0 { where = append([]string{}) for _, pk := range e.Table.PKColumns { switch v := value[pk].(type) { case string: where = append(where, "`"+e.Table.Columns[pk].Name+"` = '"+v+"'") case int8, int16, int32, int, int64: strV := fmt.Sprintf("%d", v) where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strV) case decimal.Decimal: where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+v.String()) case float32: where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32)) case float64: where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64)) default: where = append(where, "`"+e.Table.Columns[pk].Name+"` is NULL") } } if strings.Trim(nodename, " ") != "" { where = append(where, nodename+" = "+"'"+nodevalue+"'") } } else { uset = append([]string{}) for j, _ := range e.Table.Columns { xx := reflect.DeepEqual(e.Rows[i-1][j], value[j]) if !xx { switch v := value[j].(type) { case string: uset = append(uset, "`"+e.Table.Columns[j].Name+"` = '"+v+"'") case int8, int16, int32, int, int64: strV := fmt.Sprintf("%d", v) uset = append(uset, "`"+e.Table.Columns[j].Name+"` = "+strV) case decimal.Decimal: uset = append(uset, "`"+e.Table.Columns[j].Name+"` = "+v.String()) case float32: uset = append(uset, "`"+e.Table.Columns[j].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32)) case float64: uset = append(uset, "`"+e.Table.Columns[j].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64)) default: uset = append(uset, "`"+e.Table.Columns[j].Name+"` = NULL") } } } rows = append(rows, fmt.Sprintf(Update, e.Table.Name, strings.Join(uset, ", "), strings.Join(where, " and "))) } } query = strings.Join(rows, "\n") } else if e.Action == "delete" { var rows []string for _, value := range e.Rows { var where []string for _, pk := range e.Table.PKColumns { switch v := value[pk].(type) { case string: where = append(where, "`"+e.Table.Columns[pk].Name+"` = '"+v+"'") case int8, int16, int32, int, int64: strV := fmt.Sprintf("%d", v) where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strV) case float32: where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32)) case float64: where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64)) default: where = append(where, "`"+e.Table.Columns[pk].Name+"` is NULL") } } if strings.Trim(nodename, " ") != "" { where = append(where, nodename+" = "+"'"+nodevalue+"'") } rows = append(rows, fmt.Sprintf(Delete, e.Table.Name, strings.Join(where, " and "))) } query = strings.Join(rows, "\n") } return query } type handler struct { canal.DummyEventHandler } func (h *handler) OnRow(e *canal.RowsEvent) error { needsave := 0 if len(tables) > 0 && len(tableDB) > 0 { subs := strings.Split(tables, ",") for _, value := range subs { if strings.ToLower(e.Table.Name) == strings.ToLower(value) { needsave = 1 break } } } else { needsave = 1 } if needsave > 0 { s := GetDML(e) if len(s) > 0 { if showlog > 0 { fmt.Println(s) } //insertlog(s) //sqlDeque.PushBack(s) //mysqlQueue.Enqueue(s) //WriteChWithSelect(mysqlch, s) //savesqltofile(s) err := DqueueIndex.PushOneIndex([]byte(s)) if err != nil{ fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:",err) } } } return nil } func (h *handler) OnRotate(roateEvent *replication.RotateEvent) error { //writePosini(string(roateEvent.NextLogName), uint32(roateEvent.Position)) setini(string(roateEvent.NextLogName), uint32(roateEvent.Position)) return nil } func (h *handler) OnPosSynced(p mysql.Position, G mysql.GTIDSet, g bool) error { // fmt.Println(p.Name + " | " + strconv.Itoa(int(p.Pos))) endName = p.Name endPos = p.Pos //writePosini(p.Name, p.Pos) setini(p.Name, p.Pos) return nil } func (h *handler) String() string { return "TestHandler" } func Readini() { CurrentPath, _ = GetCurrentPath() h, _ := log.NewRotatingFileHandler(CurrentPath+"Consumer.log", 40*1024*1024, 10) logger = log.NewDefault(h) logger.SetLevelByName("info") inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini") if err != nil { logger.Errorln("读取配置文件失败1[config.ini]",err.Error()) return } PosChan = make(chan Posname,10000) go runsetini() host = inicfg.MustValue("canal", "host", "127.0.0.1") port = inicfg.MustInt("canal", "port", 3306) user = inicfg.MustValue("canal", "user", "root") password = inicfg.MustValue("canal", "password", "root") flavor = inicfg.MustValue("canal", "flavor", "mariadb") serverID = inicfg.MustInt("canal", "serverID", 101) mysqldump = inicfg.MustValue("canal", "mysqldump", "mysqldump") tables = inicfg.MustValue("canal", "tables", "") tableDB = inicfg.MustValue("canal", "tableDB", "") ignoreTables = inicfg.MustValue("canal", "ignoreTables", "") startName = inicfg.MustValue("canal", "startName", "") startPos = uint(inicfg.MustInt("canal", "startPos", 0)) endName = startName endPos = uint32(startPos) heartbeatPeriod = inicfg.MustInt("canal", "heartbeatPeriod", 60) readTimeout = inicfg.MustInt("canal", "readTimeout", 90) nodename = inicfg.MustValue("canal", "nodename", "") nodevalue = inicfg.MustValue("canal", "nodevalue", "1") showlog = inicfg.MustInt("canal", "showlog", 1) onetimerows = inicfg.MustValue("canal", "onetimerows", "1000") KafkaEnable = inicfg.MustBool("kafka", "kafkaEnable", false) kafka_host = inicfg.MustValue("kafka", "kafka_host", "127.0.0.1") kafka_port = inicfg.MustInt("kafka", "kafka_port", 9092) kafka_topic = inicfg.MustValue("kafka", "kafka_topic", "kafka_go_test") saslEnable = inicfg.MustBool("kafka", "saslEnable", false) username = inicfg.MustValue("kafka", "username", "root") saslpassword = inicfg.MustValue("kafka", "saslpassword", "root") tlsEnable = inicfg.MustBool("kafka", "tlsEnable", false) clientcert = inicfg.MustValue("kafka", "clientcert", "") clientkey = inicfg.MustValue("kafka", "clientkey", "") cacert = inicfg.MustValue("kafka", "cacert", "") MqttEnable = inicfg.MustBool("mqtt", "mqttEnable", false) KptCattleId = inicfg.MustValue("mqtt", "kptCattleId", "888888") mqtt_nodevalue = nodevalue mqtt_host = inicfg.MustValue("mqtt", "host", "127.0.0.1") mqtt_port = inicfg.MustInt("mqtt", "port", 9092) mqtt_path = inicfg.MustValue("mqtt", "path", "") mqtt_qos = inicfg.MustInt("mqtt", "qos", 0) mqtt_topic = inicfg.MustValue("mqtt", "topic", "mqtt_topic") mqtt_saslEnable = inicfg.MustBool("mqtt", "saslEnable", false) mqtt_username = inicfg.MustValue("mqtt", "username", "127.0.0.1") mqtt_saslpassword = inicfg.MustValue("mqtt", "saslpassword", "") mqtt_tlsEnable = inicfg.MustBool("mqtt", "tlsEnable", false) mqtt_clientcert = inicfg.MustValue("mqtt", "clientcert", "") mqtt_clientkey = inicfg.MustValue("mqtt", "clientkey", "") mqtt_cacert = inicfg.MustValue("mqtt", "cacert", "") mqtt_CleanSession = inicfg.MustBool("mqtt", "CleanSession", true) ServiceName = inicfg.MustValue("Service", "ServiceName", "KPTDataService") ServiceDisplayName = inicfg.MustValue("Service", "ServiceDisplayName", "KPTDataService") ServiceDescription = inicfg.MustValue("Service", "ServiceDescription", "科湃腾数据同步") openini() file := CurrentPath +"/pos/pos.db" defer slowpoke.Close(CurrentPath +"pos/pos.db") keyN := []byte("startName") keyP := []byte("startPos") resN, err := slowpoke.Get(file, keyN) if err != nil || string(resN) == "" { logger.Error(" slowpoke.Get startName err",err," posname :",resN) }else{ res, err := slowpoke.Get(file, keyP) if err != nil { logger.Error(" slowpoke.Get startPos err",err) }else{ str,err := strconv.Atoi(string(res)) if err != nil || str == 0 { logger.Error("strconv.Atoi err",err," pos :",str) }else { startName = string(resN) startPos = uint(str) } logger.Info("read pos name ", startPos, startName) } } logger.Info("end pos name", startPos, startName) } func initPos(_user, _password, _host,_dbs string, _port int)(int,string,error){ connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "mysql") var err error posdb, err = sql.Open("mysql", connecting) if err == nil { posdb.SetMaxOpenConns(10) posdb.SetMaxIdleConns(10) } else { return 0,"",err } _, err = posdb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`") posdb.Close() connecting = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "sqllog") posdb, err = sql.Open("mysql", connecting) if err == nil { posdb.SetMaxOpenConns(10) posdb.SetMaxIdleConns(10) } else { return 0,"",err } _, _ = posdb.Exec("CREATE TABLE IF NOT EXISTS `mypos` (`id` INT(11) NOT NULL ,`pos` INT(11) DEFAULT 0,`file` VARCHAR(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=INNODB DEFAULT CHARSET=utf8;") rows := posdb.QueryRow("select pos,file from mypos where id = 0 ") pos := 0 file := "" if err := rows.Scan(&pos, &file); err != nil { _,err = posdb.Exec("insert mypos(id,pos,file) value (0,0,'')") } return pos,file,err } func openini() { CurrentPath, _ = GetCurrentPath() var err error inicfg, err = goconfig.LoadConfigFile(CurrentPath + "config.ini") if err != nil { logger.Errorln("读取配置文件失败[config.ini]",err.Error()) return } } func setini(name string, pos uint32) { posname := Posname{name,pos} PosChan <- posname } func runsetini() { for{ posname := <-PosChan logger.Info("binlog pos :",posname.pos," binlog posname :",posname.name) CurrentPath, _ = GetCurrentPath() inicfg.SetValue("canal", "startName", posname.name) inicfg.SetValue("canal", "startPos", strconv.Itoa(int(posname.pos))) file := CurrentPath +"pos/pos.db" err :=slowpoke.Set(file, []byte("startName"), []byte(posname.name)) if err != nil { logger.Error(" slowpoke.Set startName err",err) } err = slowpoke.Set(file, []byte("startPos"),[]byte(strconv.Itoa(int(posname.pos)))) if err != nil { logger.Error(" slowpoke.Set startPos err",err) } err = slowpoke.Close(CurrentPath +"pos/pos.db") if err != nil { logger.Error(" slowpoke.Close err",err) } } } //func saveini() { // // //} func writeini(name string, pos uint32) { CurrentPath, _ = GetCurrentPath() inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini") if err != nil { logger.Infoln("读取配置文件失败[config.ini]") return } inicfg.SetValue("canal", "startName", name) inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos))) err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini") if err != nil { logger.Infoln("保存配置文件失败[config.ini]") return } } func writeini1(name string, pos uint32) { CurrentPath, _ = GetCurrentPath() inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini") if err != nil { logger.Infoln("读取配置文件失败[config.ini]") return } inicfg.SetValue("canal", "startName", name) inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos))) err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini") if err != nil { logger.Infoln("保存配置文件失败[config.ini]") return } } func writePosini(name string, pos uint32) { println("pos",pos) tx,_ := posdb.Begin() _,err :=tx.Exec("update mypos set pos = ? , file = ? where id = 0",pos,name) if err != nil{ tx.Rollback() }else { tx.Commit() } //CurrentPath, _ = GetCurrentPath() //inicfg, err := goconfig.LoadConfigFile(CurrentPath + "configPos.ini") //if err != nil { // logger.Errorln("读取配置文件失败[configPos.ini]") // return //} //inicfg.SetValue("canal", "startName", name) //inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos))) //err = goconfig.SaveConfigFile(inicfg, CurrentPath+"configPos.ini") //if err != nil { // logger.Errorln("保存配置文件失败[configPos.ini]") // return //} } func IntToBytes(n int) []byte { data := int64(n) bytebuf := bytes.NewBuffer([]byte{}) binary.Write(bytebuf, binary.BigEndian, data) return bytebuf.Bytes() } func BytesToInt(bys []byte) int { bytebuff := bytes.NewBuffer(bys) var data int64 binary.Read(bytebuff, binary.BigEndian, &data) return int(data) } func RunService() { //Readini() var err error DqueueIndex,err = OpenIndexFile(CurrentPath+ "logs",100000,10000,false) if err != nil { fmt.Printf("create DqueueIndex err %v", err) os.Exit(1) } cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", host, port) cfg.User = user cfg.Password = password cfg.Flavor = flavor cfg.UseDecimal = true cfg.ReadTimeout = time.Duration(readTimeout) * time.Second cfg.HeartbeatPeriod = time.Duration(heartbeatPeriod) * time.Second cfg.ServerID = uint32(serverID) cfg.Dump.ExecutionPath = CurrentPath + mysqldump cfg.Dump.DiscardErr = false cfg.Dump.MaxAllowedPacketMB = 500 //cfg.Dump.TableDB = "tmrwatch" c, err := canal.NewCanal(cfg) if err != nil { fmt.Printf("create canal err %v", err) os.Exit(1) } if len(ignoreTables) > 0 { subs := strings.Split(ignoreTables, ",") for _, sub := range subs { if seps := strings.Split(sub, "."); len(seps) == 2 { c.AddDumpIgnoreTables(seps[0], seps[1]) } } } if len(tables) > 0 && len(tableDB) > 0 { subs := strings.Split(tables, ",") c.AddDumpTables(tableDB, subs...) } else if len(tableDB) > 0 { subs := strings.Split(tableDB, ",") c.AddDumpDatabases(subs...) } c.SetEventHandler(&handler{}) startPos_ := mysql.Position{ Name: startName, Pos: uint32(startPos), } ctx := context.Background() ctx, cancel := context.WithCancel(ctx) go func() { if !(startPos > 0) { initDB() //SqlBitcask.DeleteAll() //SqlBitcask.Merge() insertEmpty() } err = c.RunFrom(startPos_) if err != nil { //出错后,将重头开始上传 fmt.Printf("start canal err %v", err) logger.Error("start canal err %v", err) c.Close() c, err := canal.NewCanal(cfg) if err != nil { fmt.Printf("create canal err %v", err) os.Exit(1) } if len(ignoreTables) > 0 { subs := strings.Split(ignoreTables, ",") for _, sub := range subs { if seps := strings.Split(sub, "."); len(seps) == 2 { c.AddDumpIgnoreTables(seps[0], seps[1]) } } } if len(tables) > 0 && len(tableDB) > 0 { subs := strings.Split(tables, ",") c.AddDumpTables(tableDB, subs...) } else if len(tableDB) > 0 { subs := strings.Split(tableDB, ",") c.AddDumpDatabases(subs...) } c.SetEventHandler(&handler{}) ctx := context.Background() ctx, cancel = context.WithCancel(ctx) startPos_ = mysql.Position{ Name: "", Pos: 0, } initDB() insertEmpty() err = c.RunFrom(startPos_) logger.Error("start canal err second %v", err) } }() //监听 //ctxListen := context.Background() //ctxListen, cancelListen := context.WithCancel(ctxListen) //go func() { // ticker := time.Tick(60 * time.Second) // for { // select { // case <-ticker: // if time.Since(insertlog_lasttime) > 60*time.Second && Exectx != nil { // // err := Exectx.Commit() // Exectx = nil // if err != nil { // fmt.Println("Exectx.Commit()ticker err:", err) // } // insertlog_lasttime = time.Now() // } // case <-ctxListen.Done(): // if Exectx != nil { // // err := Exectx.Commit() // Exectx = nil // if err != nil { // fmt.Println("Exectx.Commit()ticker err:", err) // } // insertlog_lasttime = time.Now() // fmt.Println("Exectx.Commit()tickerEXIT:", err) // return // } // // } // } // //}() if KafkaEnable { go Kafka_producerDB(ctx) }else if MqttEnable { go Mqtt_producerDB(ctx) } Ch = make(chan int, 1) <-Ch //cancelListen() cancel() DqueueIndex.CloseIndex() //setini(endName,endPos) //writeini(endName,endPos) fmt.Println("程序被关闭 ") c.Close() }