| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920 | package kptimport (	"bytes"	"context"	"database/sql"	"encoding/binary"	"errors"	"fmt"	"github.com/Unknwon/goconfig"	"github.com/go-mysql-org/go-mysql/canal"	"github.com/go-mysql-org/go-mysql/mysql"	"github.com/go-mysql-org/go-mysql/replication"	_ "github.com/go-sql-driver/mysql" //导入mysql驱动包	"github.com/recoilme/slowpoke"	"github.com/shopspring/decimal"	"github.com/siddontang/go-log/log"	"os"	"os/exec"	"path/filepath"	"reflect"	"runtime"	"strconv"	"strings"	"time")var (	host     string //"MySQL host")	port     int    //"MySQL port")	user     string // "MySQL user, must have replication privilege")	password string // "MySQL password")	flavor       string // "Flavor: mysql or mariadb")	serverID     int    // "Unique Server ID")	mysqldump    string // "mysqldump execution path")	tables       string // "dump tables, seperated by comma, will overwrite dbs")	tableDB      string // "database for dump tables")	ignoreTables string // "ignore tables, must be database.table format, separated by comma")	startName string // "start sync from binlog name")	startPos  uint   // "start sync from binlog position of")	endName string // "start sync from binlog name")	endPos  uint32   // "start sync from binlog position of")	heartbeatPeriod int // "master heartbeat period")	readTimeout     int // "connection read timeout")	nodename  string //nodename  key name	nodevalue string //nodevalue value	showlog int	ServiceName        string //服务显示名称	ServiceDisplayName string //服务名称	ServiceDescription string //服务描述	KafkaEnable        bool	MqttEnable         bool	CurrentPath        string	mydb               *sql.DB // 全局的数据库操作句柄	posdb              *sql.DB // 全局的数据库操作句柄	inicfg				   *goconfig.ConfigFile	PosChan			   chan Posname	stmt               *sql.Stmt	deletestmt         *sql.Stmt	logger             *log.Logger	Ch                 chan int	insertstmt         string = ""	insertcount        int    = 0	insertcountv       int    = 0	Exectx             *sql.Tx	insertlog_lasttime time.Time	DqueueIndex  		 *DQueue	DqueueIndexMqtt     *DQueue	mqttDb		  *sql.DB	KptCattleId		string)const (	Type      = "mysql"	Insert    = `INSERT INTO %s(%s) VALUES%s;`	Update    = `UPDATE %s SET %s WHERE %s;`	Delete    = `DELETE FROM %s WHERE %s;`	DeleteAll = `TRUNCATE TABLE %s`)type Posname struct {	name string	pos uint32}func ReadWithSelect(sqlch chan string) (x string, res bool) {	select {	case x = <-sqlch:		return x, true	default:		return "", false	}}func WriteChWithSelect(sqlch chan string, sql string) bool {	timeout := time.NewTimer(time.Microsecond * 500)	select {	case sqlch <- sql:		return true	case <-timeout.C:		{			logger.Errorf("WriteChWithSelect 超时: %s", sql)			fmt.Printf("WriteChWithSelect 超时: %s", sql)			return false		}	}}func GetCurrentPath() (string, error) {	file, err := exec.LookPath(os.Args[0])	if err != nil {		return "", err	}	path, err := filepath.Abs(file)	if err != nil {		return "", err	}	//fmt.Println("path111:", path)	if runtime.GOOS == "windows" {		path = strings.Replace(path, "\\", "/", -1)	}	//fmt.Println("path222:", path)	i := strings.LastIndex(path, "/")	if i < 0 {		return "", errors.New(`Can't find "/" or "\".`)	}	//fmt.Println("path333:", path)	return string(path[0 : i+1]), nil}func GetColumns(e *canal.RowsEvent) []string {	var Columns []string	for _, value := range e.Table.Columns {		Columns = append(Columns, "`"+value.Name+"`")	}	if strings.Trim(nodename, " ") != "" {		Columns = append(Columns, strings.Trim(nodename, " "))	}	return Columns}func initDB() {	err := GetDbsConnect(user, password, host, port, "mysql")	_, err = mydb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`")	mydb.Close()	err = GetDbsConnect(user, password, host, port, "sqllog")	if err == nil {		_, _ = mydb.Exec("CREATE TABLE `tablesqllog` (`id` int(11) NOT NULL AUTO_INCREMENT,`sql` text DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;")		insertEmpty()	}}func GetDbsConnect(_user, _password, _host string, _port int, _dbs string) error {	connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs)	var err error	mydb, err = sql.Open("mysql", connecting)	if err == nil {		mydb.SetMaxOpenConns(10)		mydb.SetMaxIdleConns(10)	} else {		return err	}	return nil}func GetmyDbsConnect(_user, _password, _host string, _port int, _dbs string) (*sql.DB, error) {	connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs)	mdb, err := sql.Open("mysql", connecting)	if err == nil {		mdb.SetMaxOpenConns(10)		mdb.SetMaxIdleConns(10)	} else {		return nil, err	}	return mdb, nil}func insertEmpty() {	if len(tables) > 0 && len(tableDB) > 0 {		subs := strings.Split(tables, ",")		for i := len(subs) - 1; i >= 0; i-- {			var where []string			where = append(where, " 1 = 1")			if strings.Trim(nodename, " ") != "" {				where = append(where, nodename+" = "+"'"+nodevalue+"'")			}			sqls := fmt.Sprintf(Delete, subs[i], strings.Join(where, " and "))			//insertlog(sqls)			//savesqltofile(sqls)			err := DqueueIndex.PushOneIndex([]byte(sqls))			if err != nil{				fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:",err)			}		}		//for _, value := range subs {		//}	}}//func () {//	for i:= 0 ; i<100000000;i++ {//		select {//		case <-c2 ://			fmt.Println("c2退出")//			return//		default://			err =dqueuePop.PushOneIndex(//				[]byte("a"+strconv.Itoa(i)),//			)//			if err != nil{//				fmt.Println("dqueuePop.PushOneIndex err",err)////			}else {//				fmt.Println("dqueuePop.PushOneIndex sucess","a"+strconv.Itoa(i))//			}//			//time.Sleep(1*time.Millisecond)//		}//		//err =dqueue.Push([][]byte{//		//	[]byte("a"+strconv.Itoa(i)),//		//	[]byte("b"+strconv.Itoa(i)),//		//})//	}//}func appendinsertsql(sqlstr string) {	if insertcount == 0 {		insertstmt = "insert into tablesqllog(`sql`) values(\"" + sqlstr + "\")"	} else {		insertstmt = insertstmt + ",(\"" + sqlstr + "\")"	}	insertcount++}func insertlog(sqlstr string) {	var err error	if mydb == nil {		connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", user, password, host, port, "sqllog")		mydb, err = sql.Open("mysql", connecting)		if err != nil {			fmt.Println(" sql.Open err", err)		}		mydb.SetMaxOpenConns(100)		mydb.SetMaxIdleConns(10)		mydb.Ping()	}	if Exectx == nil {		Exectx, err = mydb.Begin()		if err != nil {			fmt.Println(err)		}	}	if Exectx != nil {		_, err = Exectx.Exec("insert into tablesqllog(`sql`) values(?);", sqlstr)		if err != nil {			fmt.Println("Exectx.Exec(", err)		} else {			insertcount++			if time.Since(insertlog_lasttime) > 60*time.Second {				err := Exectx.Commit()				Exectx = nil				if err != nil {					fmt.Println("Exectx.Commit():", err)				}				insertlog_lasttime = time.Now() // get current time			} else {				if insertcount > 10000 {					err := Exectx.Commit()					Exectx = nil					if err != nil {						fmt.Println("Exectx.Commit()insertcount:", err)					}					insertlog_lasttime = time.Now() // get current time					insertcount = 0				}			}		}	}}func Exec(mdb *sql.DB, _exsqlstr string) error {	if mdb != nil {		_, err := mdb.Exec(_exsqlstr)		if err != nil {			return err		}	}	return nil}func ExecT(mdb *sql.Tx, _exsqlstr string) error {	if mdb != nil {		_, err := mdb.Exec(_exsqlstr)		if err != nil {			return err		}	}	return nil}func Get(mdb *sql.DB) (int, string) {	var id int	var sql string	sql = ""	if mdb != nil {		//参数绑定,可以避免sql注入		rows := mdb.QueryRow("select `id`,`sql` from  `tablesqllog` order by id limit 1")		err := rows.Scan(&id, &sql)		if err != nil {			return 0, ""		}	}	return id, sql}func delete(mdb *sql.DB, id int) error {	if deletestmt == nil {		deletestmt, _ = mdb.Prepare("delete from `tablesqllog` where id <= ? ")	}	if deletestmt != nil {		_, err := deletestmt.Exec(id)		if err != nil {			return err		}	}	return nil}func GetDML(e *canal.RowsEvent) string {	query := ""	if e.Action == "insert" {		Columns := GetColumns(e)		var rows []string		for _, value := range e.Rows {			var frows []string			for _, fvalue := range value {				switch v := fvalue.(type) {				case string:					frows = append(frows, "'"+v+"'")				case int8, int16, int32, int, int64:					strV := fmt.Sprintf("%d", v)					frows = append(frows, strV)				case decimal.Decimal:					frows = append(frows, v.String())				case float32:					frows = append(frows, strconv.FormatFloat(float64(v), 'f', -1, 32))				case float64:					frows = append(frows, strconv.FormatFloat(v, 'f', -1, 64))				default:					frows = append(frows, "NULL")				}			}			if strings.Trim(nodename, " ") != "" {				frows = append(frows, "'"+nodevalue+"'")			}			rows = append(rows, "("+strings.Join(frows, ",")+")")		}		query = fmt.Sprintf(Insert, e.Table.Name, strings.Join(Columns, ","), strings.Join(rows, ","))	} else if e.Action == "update" {		var rows []string		var where []string		var uset []string		for i, value := range e.Rows {			if (i % 2) == 0 {				where = append([]string{})				for _, pk := range e.Table.PKColumns {					switch v := value[pk].(type) {					case string:						where = append(where, "`"+e.Table.Columns[pk].Name+"` = '"+v+"'")					case int8, int16, int32, int, int64:						strV := fmt.Sprintf("%d", v)						where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strV)					case decimal.Decimal:						where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+v.String())					case float32:						where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))					case float64:						where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))					default:						where = append(where, "`"+e.Table.Columns[pk].Name+"` is NULL")					}				}				if strings.Trim(nodename, " ") != "" {					where = append(where, nodename+" = "+"'"+nodevalue+"'")				}			} else {				uset = append([]string{})				for j, _ := range e.Table.Columns {					xx := reflect.DeepEqual(e.Rows[i-1][j], value[j])					if !xx {						switch v := value[j].(type) {						case string:							uset = append(uset, "`"+e.Table.Columns[j].Name+"` = '"+v+"'")						case int8, int16, int32, int, int64:							strV := fmt.Sprintf("%d", v)							uset = append(uset, "`"+e.Table.Columns[j].Name+"` = "+strV)						case decimal.Decimal:							uset = append(uset, "`"+e.Table.Columns[j].Name+"` = "+v.String())						case float32:							uset = append(uset, "`"+e.Table.Columns[j].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))						case float64:							uset = append(uset, "`"+e.Table.Columns[j].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))						default:							uset = append(uset, "`"+e.Table.Columns[j].Name+"` = NULL")						}					}				}				rows = append(rows, fmt.Sprintf(Update, e.Table.Name, strings.Join(uset, ", "), strings.Join(where, " and ")))			}		}		query = strings.Join(rows, "\n")	} else if e.Action == "delete" {		var rows []string		for _, value := range e.Rows {			var where []string			for _, pk := range e.Table.PKColumns {				switch v := value[pk].(type) {				case string:					where = append(where, "`"+e.Table.Columns[pk].Name+"` = '"+v+"'")				case int8, int16, int32, int, int64:					strV := fmt.Sprintf("%d", v)					where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strV)				case float32:					where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))				case float64:					where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))				default:					where = append(where, "`"+e.Table.Columns[pk].Name+"` is NULL")				}			}			if strings.Trim(nodename, " ") != "" {				where = append(where, nodename+" = "+"'"+nodevalue+"'")			}			rows = append(rows, fmt.Sprintf(Delete, e.Table.Name, strings.Join(where, " and ")))		}		query = strings.Join(rows, "\n")	}	return query}type handler struct {	canal.DummyEventHandler}func (h *handler) OnRow(e *canal.RowsEvent) error {	needsave := 0	if len(tables) > 0 && len(tableDB) > 0 {		subs := strings.Split(tables, ",")		for _, value := range subs {			if strings.ToLower(e.Table.Name) == strings.ToLower(value) {				needsave = 1				break			}		}	} else {		needsave = 1	}	if needsave > 0 {		s := GetDML(e)		if len(s) > 0 {			if showlog > 0 {				fmt.Println(s)			}			//insertlog(s)			//sqlDeque.PushBack(s)			//mysqlQueue.Enqueue(s)			//WriteChWithSelect(mysqlch, s)			//savesqltofile(s)			err := DqueueIndex.PushOneIndex([]byte(s))			if err != nil{				fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:",err)			}		}	}	return nil}func (h *handler) OnRotate(roateEvent *replication.RotateEvent) error {	//writePosini(string(roateEvent.NextLogName), uint32(roateEvent.Position))	setini(string(roateEvent.NextLogName), uint32(roateEvent.Position))	return nil}func (h *handler) OnPosSynced(p mysql.Position, G mysql.GTIDSet, g bool) error {	//	fmt.Println(p.Name + " |  " + strconv.Itoa(int(p.Pos)))	endName = p.Name	endPos = p.Pos	//writePosini(p.Name, p.Pos)	setini(p.Name, p.Pos)	return nil}func (h *handler) String() string {	return "TestHandler"}func Readini() {	CurrentPath, _ = GetCurrentPath()	h, _ := log.NewRotatingFileHandler(CurrentPath+"Consumer.log", 40*1024*1024, 10)	logger = log.NewDefault(h)	logger.SetLevelByName("info")	inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")	if err != nil {		logger.Errorln("读取配置文件失败1[config.ini]",err.Error())		return	}	PosChan = make(chan Posname,10000)     go runsetini()	host = inicfg.MustValue("canal", "host", "127.0.0.1")	port = inicfg.MustInt("canal", "port", 3306)	user = inicfg.MustValue("canal", "user", "root")	password = inicfg.MustValue("canal", "password", "root")	flavor = inicfg.MustValue("canal", "flavor", "mariadb")	serverID = inicfg.MustInt("canal", "serverID", 101)	mysqldump = inicfg.MustValue("canal", "mysqldump", "mysqldump")	tables = inicfg.MustValue("canal", "tables", "")	tableDB = inicfg.MustValue("canal", "tableDB", "")	ignoreTables = inicfg.MustValue("canal", "ignoreTables", "")	startName = inicfg.MustValue("canal", "startName", "")	startPos = uint(inicfg.MustInt("canal", "startPos", 0))	endName = startName	endPos = uint32(startPos)	heartbeatPeriod = inicfg.MustInt("canal", "heartbeatPeriod", 60)	readTimeout = inicfg.MustInt("canal", "readTimeout", 90)	nodename = inicfg.MustValue("canal", "nodename", "")	nodevalue = inicfg.MustValue("canal", "nodevalue", "1")	showlog = inicfg.MustInt("canal", "showlog", 1)	onetimerows = inicfg.MustValue("canal", "onetimerows", "1000")	KafkaEnable = inicfg.MustBool("kafka", "kafkaEnable", false)	kafka_host = inicfg.MustValue("kafka", "kafka_host", "127.0.0.1")	kafka_port = inicfg.MustInt("kafka", "kafka_port", 9092)	kafka_topic = inicfg.MustValue("kafka", "kafka_topic", "kafka_go_test")	saslEnable = inicfg.MustBool("kafka", "saslEnable", false)	username = inicfg.MustValue("kafka", "username", "root")	saslpassword = inicfg.MustValue("kafka", "saslpassword", "root")	tlsEnable = inicfg.MustBool("kafka", "tlsEnable", false)	clientcert = inicfg.MustValue("kafka", "clientcert", "")	clientkey = inicfg.MustValue("kafka", "clientkey", "")	cacert = inicfg.MustValue("kafka", "cacert", "")	MqttEnable = inicfg.MustBool("mqtt", "mqttEnable", false)	KptCattleId = inicfg.MustValue("mqtt", "kptCattleId", "888888")	mqtt_nodevalue = nodevalue	mqtt_host = inicfg.MustValue("mqtt", "host", "127.0.0.1")	mqtt_port = inicfg.MustInt("mqtt", "port", 9092)	mqtt_path = inicfg.MustValue("mqtt", "path", "")	mqtt_qos = inicfg.MustInt("mqtt", "qos", 0)	mqtt_topic = inicfg.MustValue("mqtt", "topic", "mqtt_topic")	mqtt_saslEnable = inicfg.MustBool("mqtt", "saslEnable", false)	mqtt_username = inicfg.MustValue("mqtt", "username", "127.0.0.1")	mqtt_saslpassword = inicfg.MustValue("mqtt", "saslpassword", "")	mqtt_tlsEnable = inicfg.MustBool("mqtt", "tlsEnable", false)	mqtt_clientcert = inicfg.MustValue("mqtt", "clientcert", "")	mqtt_clientkey = inicfg.MustValue("mqtt", "clientkey", "")	mqtt_cacert = inicfg.MustValue("mqtt", "cacert", "")	mqtt_CleanSession = inicfg.MustBool("mqtt", "CleanSession", true)	ServiceName = inicfg.MustValue("Service", "ServiceName", "KPTDataService")	ServiceDisplayName = inicfg.MustValue("Service", "ServiceDisplayName", "KPTDataService")	ServiceDescription = inicfg.MustValue("Service", "ServiceDescription", "科湃腾数据同步")	openini()	file := CurrentPath +"/pos/pos.db"	defer slowpoke.Close(CurrentPath +"pos/pos.db")	keyN := []byte("startName")	keyP := []byte("startPos")	resN, err := slowpoke.Get(file, keyN)	if err != nil || string(resN) == "" {		logger.Error(" slowpoke.Get startName err",err,"  posname :",resN)	}else{		res, err := slowpoke.Get(file, keyP)		if err != nil {			logger.Error(" slowpoke.Get startPos err",err)		}else{			str,err :=  strconv.Atoi(string(res))			if err != nil || str == 0 {				logger.Error("strconv.Atoi err",err,"  pos :",str)			}else {				startName = string(resN)				startPos = uint(str)			}			logger.Info("read pos name ", startPos, startName)		}	}	logger.Info("end pos name", startPos, startName)}func initPos(_user, _password, _host,_dbs string, _port int)(int,string,error){	connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "mysql")	var err error	posdb, err = sql.Open("mysql", connecting)	if err == nil {		posdb.SetMaxOpenConns(10)		posdb.SetMaxIdleConns(10)	} else {		return 0,"",err	}	_, err = posdb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`")	posdb.Close()	connecting = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "sqllog")	posdb, err = sql.Open("mysql", connecting)	if err == nil {		posdb.SetMaxOpenConns(10)		posdb.SetMaxIdleConns(10)	} else {		return 0,"",err	}	_, _ = posdb.Exec("CREATE TABLE IF NOT EXISTS `mypos` (`id` INT(11) NOT NULL ,`pos` INT(11) DEFAULT 0,`file` VARCHAR(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=INNODB DEFAULT CHARSET=utf8;")	rows := posdb.QueryRow("select pos,file from mypos where id = 0 ")		pos := 0		file := ""		if err := rows.Scan(&pos, &file); err != nil {			_,err = posdb.Exec("insert mypos(id,pos,file) value (0,0,'')")		}	return pos,file,err}func openini() {	CurrentPath, _ = GetCurrentPath()	var err error	inicfg, err = goconfig.LoadConfigFile(CurrentPath + "config.ini")	if err != nil {		logger.Errorln("读取配置文件失败[config.ini]",err.Error())		return	}}func setini(name string, pos uint32) {	posname := Posname{name,pos}	PosChan <- posname}func runsetini() {	for{		posname := <-PosChan		logger.Info("binlog pos :",posname.pos," binlog posname :",posname.name)		CurrentPath, _ = GetCurrentPath()		inicfg.SetValue("canal", "startName", posname.name)		inicfg.SetValue("canal", "startPos", strconv.Itoa(int(posname.pos)))		file := CurrentPath +"pos/pos.db"		err :=slowpoke.Set(file, []byte("startName"), []byte(posname.name))		if err != nil {			logger.Error(" slowpoke.Set startName err",err)		}		err  = slowpoke.Set(file, []byte("startPos"),[]byte(strconv.Itoa(int(posname.pos))))		if err != nil {			logger.Error(" slowpoke.Set startPos err",err)		}		err = slowpoke.Close(CurrentPath +"pos/pos.db")		if err != nil {			logger.Error(" slowpoke.Close err",err)		}	}}//func saveini() {//////}func writeini(name string, pos uint32) {	CurrentPath, _ = GetCurrentPath()	inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")	if err != nil {		logger.Infoln("读取配置文件失败[config.ini]")		return	}	inicfg.SetValue("canal", "startName", name)	inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))	err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini")	if err != nil {		logger.Infoln("保存配置文件失败[config.ini]")		return	}}func writeini1(name string, pos uint32) {	CurrentPath, _ = GetCurrentPath()	inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")	if err != nil {		logger.Infoln("读取配置文件失败[config.ini]")		return	}	inicfg.SetValue("canal", "startName", name)	inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))	err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini")	if err != nil {		logger.Infoln("保存配置文件失败[config.ini]")		return	}}func writePosini(name string, pos uint32) {	println("pos",pos)	tx,_ := posdb.Begin() 	_,err :=tx.Exec("update mypos set  pos = ? , file = ? where id = 0",pos,name) 	if err != nil{ 		tx.Rollback()	}else {		tx.Commit()	}	//CurrentPath, _ = GetCurrentPath()	//inicfg, err := goconfig.LoadConfigFile(CurrentPath + "configPos.ini")	//if err != nil {	//	logger.Errorln("读取配置文件失败[configPos.ini]")	//	return	//}	//inicfg.SetValue("canal", "startName", name)	//inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))	//err = goconfig.SaveConfigFile(inicfg, CurrentPath+"configPos.ini")	//if err != nil {	//	logger.Errorln("保存配置文件失败[configPos.ini]")	//	return	//}}func IntToBytes(n int) []byte {	data := int64(n)	bytebuf := bytes.NewBuffer([]byte{})	binary.Write(bytebuf, binary.BigEndian, data)	return bytebuf.Bytes()}func BytesToInt(bys []byte) int {	bytebuff := bytes.NewBuffer(bys)	var data int64	binary.Read(bytebuff, binary.BigEndian, &data)	return int(data)}func RunService() {	//Readini()	var err error	DqueueIndex,err = OpenIndexFile(CurrentPath+ "logs",100000,10000,false)	if err != nil {		fmt.Printf("create DqueueIndex err %v", err)		os.Exit(1)	}	cfg := canal.NewDefaultConfig()	cfg.Addr =  fmt.Sprintf("%s:%d", host, port)	cfg.User = user	cfg.Password = password	cfg.Flavor = flavor	cfg.UseDecimal = true	cfg.ReadTimeout = time.Duration(readTimeout) * time.Second	cfg.HeartbeatPeriod = time.Duration(heartbeatPeriod) * time.Second	cfg.ServerID = uint32(serverID)	cfg.Dump.ExecutionPath = CurrentPath + mysqldump	cfg.Dump.DiscardErr = false	cfg.Dump.MaxAllowedPacketMB = 500	//cfg.Dump.TableDB = "tmrwatch"	c, err := canal.NewCanal(cfg)	if err != nil {		fmt.Printf("create canal err %v", err)		os.Exit(1)	}	if len(ignoreTables) > 0 {		subs := strings.Split(ignoreTables, ",")		for _, sub := range subs {			if seps := strings.Split(sub, "."); len(seps) == 2 {				c.AddDumpIgnoreTables(seps[0], seps[1])			}		}	}	if len(tables) > 0 && len(tableDB) > 0 {		subs := strings.Split(tables, ",")		c.AddDumpTables(tableDB, subs...)	} else if len(tableDB) > 0 {		subs := strings.Split(tableDB, ",")		c.AddDumpDatabases(subs...)	}	c.SetEventHandler(&handler{})	startPos_ := mysql.Position{		Name: startName,		Pos:  uint32(startPos),	}	ctx := context.Background()	ctx, cancel := context.WithCancel(ctx)	go func() {		if !(startPos > 0) {			initDB()			//SqlBitcask.DeleteAll()			//SqlBitcask.Merge()			insertEmpty()		}		err = c.RunFrom(startPos_)		if err != nil {   //出错后,将重头开始上传			fmt.Printf("start canal err %v", err)			logger.Error("start canal err %v", err)			c.Close()			c, err := canal.NewCanal(cfg)			if err != nil {				fmt.Printf("create canal err %v", err)				os.Exit(1)			}			if len(ignoreTables) > 0 {				subs := strings.Split(ignoreTables, ",")				for _, sub := range subs {					if seps := strings.Split(sub, "."); len(seps) == 2 {						c.AddDumpIgnoreTables(seps[0], seps[1])					}				}			}			if len(tables) > 0 && len(tableDB) > 0 {				subs := strings.Split(tables, ",")				c.AddDumpTables(tableDB, subs...)			} else if len(tableDB) > 0 {				subs := strings.Split(tableDB, ",")				c.AddDumpDatabases(subs...)			}			c.SetEventHandler(&handler{})			ctx := context.Background()			ctx, cancel = context.WithCancel(ctx)			startPos_ = mysql.Position{				Name: "",				Pos:  0,			}			initDB()			insertEmpty()			err = c.RunFrom(startPos_)			logger.Error("start canal err second %v", err)		}	}()	//监听	//ctxListen := context.Background()	//ctxListen, cancelListen := context.WithCancel(ctxListen)	//go func() {	//	ticker := time.Tick(60 * time.Second)	//	for  {	//		select {	//		case <-ticker:	//			if time.Since(insertlog_lasttime) > 60*time.Second  && Exectx != nil {	//	//				err := Exectx.Commit()	//				Exectx = nil	//				if err != nil {	//					fmt.Println("Exectx.Commit()ticker err:", err)	//				}	//				insertlog_lasttime = time.Now()	//			}	//			case  <-ctxListen.Done():	//				if  Exectx != nil {	//	//					err := Exectx.Commit()	//					Exectx = nil	//					if err != nil {	//						fmt.Println("Exectx.Commit()ticker err:", err)	//					}	//					insertlog_lasttime = time.Now()	//					fmt.Println("Exectx.Commit()tickerEXIT:", err)	//					return	//				}	//	//		}	//	}	//	//}()	if KafkaEnable {		go Kafka_producerDB(ctx)	}else if MqttEnable {		go Mqtt_producerDB(ctx)	}	Ch = make(chan int, 1)	<-Ch	//cancelListen()	cancel()	DqueueIndex.CloseIndex()	//setini(endName,endPos)	//writeini(endName,endPos)	fmt.Println("程序被关闭 ")	c.Close()}
 |