1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264 |
- package kpt
- import (
- "bytes"
- "context"
- "database/sql"
- "encoding/binary"
- "errors"
- "fmt"
- "os"
- "os/exec"
- "path/filepath"
- "reflect"
- "runtime"
- "strconv"
- "strings"
- "time"
- "github.com/Unknwon/goconfig"
- "github.com/go-mysql-org/go-mysql/canal"
- "github.com/go-mysql-org/go-mysql/mysql"
- "github.com/go-mysql-org/go-mysql/replication"
- "github.com/go-mysql-org/go-mysql/schema"
- _ "github.com/go-sql-driver/mysql" //导入mysql驱动包
- "github.com/recoilme/slowpoke"
- "github.com/shopspring/decimal"
- "github.com/siddontang/go-log/log"
- )
- var (
- host string //"MySQL host")
- port int //"MySQL port")
- user string // "MySQL user, must have replication privilege")
- password string // "MySQL password")
- cfg *canal.Config
- flavor string // "Flavor: mysql or mariadb")
- serverID int // "Unique Server ID")
- mysqldump string // "mysqldump execution path")
- tables string // "dump tables, seperated by comma, will overwrite dbs")
- tableDB string // "database for dump tables")
- ignoreTables string // "ignore tables, must be database.table format, separated by comma")
- startName string // "start sync from binlog name")
- startPos uint // "start sync from binlog position of")
- endName string // "start sync from binlog name")
- endPos uint32 // "start sync from binlog position of")
- heartbeatPeriod int // "master heartbeat period")
- readTimeout int // "connection read timeout")
- nodename string //nodename key name
- nodevalue string //nodevalue value
- showlog int
- ServiceName string //服务显示名称
- ServiceDisplayName string //服务名称
- ServiceDescription string //服务描述
- KafkaEnable bool
- MqttEnable bool
- CurrentPath string
- mydb *sql.DB // 全局的数据库操作句柄
- posdb *sql.DB // 全局的数据库操作句柄
- inicfg *goconfig.ConfigFile
- PosChan chan Posname
- stmt *sql.Stmt
- deletestmt *sql.Stmt
- logger *log.Logger
- Ch chan int
- insertstmt string = ""
- insertcount int = 0
- insertcountv int = 0
- Exectx *sql.Tx
- insertlog_lasttime time.Time
- DqueueIndex *DQueue
- DqueueIndexMqtt *DQueue
- mqttDb *sql.DB
- KptCattleId string
- old bool
- tableField map[string]map[string]string
- pastureid string
- i int
- )
- const (
- Type = "mysql"
- Insert = `INSERT INTO %s(%s) VALUES%s;`
- Update = `UPDATE %s SET %s WHERE %s;`
- Delete = `DELETE FROM %s WHERE %s;`
- DeleteAll = `TRUNCATE TABLE %s`
- )
- type Posname struct {
- name string
- pos uint32
- }
- func ReadWithSelect(sqlch chan string) (x string, res bool) {
- select {
- case x = <-sqlch:
- return x, true
- default:
- return "", false
- }
- }
- func WriteChWithSelect(sqlch chan string, sql string) bool {
- timeout := time.NewTimer(time.Microsecond * 500)
- select {
- case sqlch <- sql:
- return true
- case <-timeout.C:
- {
- logger.Errorf("WriteChWithSelect 超时: %s", sql)
- fmt.Printf("WriteChWithSelect 超时: %s", sql)
- return false
- }
- }
- }
- func GetCurrentPath() (string, error) {
- file, err := exec.LookPath(os.Args[0])
- if err != nil {
- return "", err
- }
- path, err := filepath.Abs(file)
- if err != nil {
- return "", err
- }
- //fmt.Println("path111:", path)
- if runtime.GOOS == "windows" {
- path = strings.Replace(path, "\\", "/", -1)
- }
- //fmt.Println("path222:", path)
- i := strings.LastIndex(path, "/")
- if i < 0 {
- return "", errors.New(`Can't find "/" or "\".`)
- }
- //fmt.Println("path333:", path)
- return string(path[0 : i+1]), nil
- }
- func GetColumns(e *canal.RowsEvent) []string {
- var Columns []string
- for _, value := range e.Table.Columns {
- Columns = append(Columns, "`"+value.Name+"`")
- }
- if strings.Trim(nodename, " ") != "" {
- Columns = append(Columns, strings.Trim(nodename, " "))
- }
- return Columns
- }
- func initDB() {
- err := GetDbsConnect(user, password, host, port, "mysql")
- _, err = mydb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`")
- mydb.Close()
- err = GetDbsConnect(user, password, host, port, "sqllog")
- if err == nil {
- _, _ = mydb.Exec("CREATE TABLE `tablesqllog` (`id` int(11) NOT NULL AUTO_INCREMENT,`sql` text DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;")
- insertEmpty()
- }
- }
- func GetDbsConnect(_user, _password, _host string, _port int, _dbs string) error {
- connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs)
- var err error
- mydb, err = sql.Open("mysql", connecting)
- if err == nil {
- mydb.SetMaxOpenConns(10)
- mydb.SetMaxIdleConns(10)
- } else {
- return err
- }
- return nil
- }
- func GetmyDbsConnect(_user, _password, _host string, _port int, _dbs string) (*sql.DB, error) {
- connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs)
- mdb, err := sql.Open("mysql", connecting)
- if err == nil {
- mdb.SetMaxOpenConns(10)
- mdb.SetMaxIdleConns(10)
- } else {
- return nil, err
- }
- return mdb, nil
- }
- func insertEmpty() {
- if len(tables) > 0 && len(tableDB) > 0 {
- subs := strings.Split(tables, ",")
- for i := len(subs) - 1; i >= 0; i-- {
- var where []string
- where = append(where, " 1 = 1")
- if strings.Trim(nodename, " ") != "" {
- where = append(where, nodename+" = "+"'"+nodevalue+"'")
- }
- sqls := fmt.Sprintf(Delete, subs[i], strings.Join(where, " and "))
- //insertlog(sqls)
- //savesqltofile(sqls)
- err := DqueueIndex.PushOneIndex([]byte(sqls))
- if err != nil {
- fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:", err)
- }
- }
- //for _, value := range subs {
- //}
- }
- }
- //func () {
- // for i:= 0 ; i<100000000;i++ {
- // select {
- // case <-c2 :
- // fmt.Println("c2退出")
- // return
- // default:
- // err =dqueuePop.PushOneIndex(
- // []byte("a"+strconv.Itoa(i)),
- // )
- // if err != nil{
- // fmt.Println("dqueuePop.PushOneIndex err",err)
- //
- // }else {
- // fmt.Println("dqueuePop.PushOneIndex sucess","a"+strconv.Itoa(i))
- // }
- // //time.Sleep(1*time.Millisecond)
- // }
- // //err =dqueue.Push([][]byte{
- // // []byte("a"+strconv.Itoa(i)),
- // // []byte("b"+strconv.Itoa(i)),
- // //})
- // }
- //}
- func appendinsertsql(sqlstr string) {
- if insertcount == 0 {
- insertstmt = "insert into tablesqllog(`sql`) values(\"" + sqlstr + "\")"
- } else {
- insertstmt = insertstmt + ",(\"" + sqlstr + "\")"
- }
- insertcount++
- }
- func insertlog(sqlstr string) {
- var err error
- if mydb == nil {
- connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", user, password, host, port, "sqllog")
- mydb, err = sql.Open("mysql", connecting)
- if err != nil {
- fmt.Println(" sql.Open err", err)
- }
- mydb.SetMaxOpenConns(100)
- mydb.SetMaxIdleConns(10)
- mydb.Ping()
- }
- if Exectx == nil {
- Exectx, err = mydb.Begin()
- if err != nil {
- fmt.Println(err)
- }
- }
- if Exectx != nil {
- _, err = Exectx.Exec("insert into tablesqllog(`sql`) values(?);", sqlstr)
- if err != nil {
- fmt.Println("Exectx.Exec(", err)
- } else {
- insertcount++
- if time.Since(insertlog_lasttime) > 60*time.Second {
- err := Exectx.Commit()
- Exectx = nil
- if err != nil {
- fmt.Println("Exectx.Commit():", err)
- }
- insertlog_lasttime = time.Now() // get current time
- } else {
- if insertcount > 10000 {
- err := Exectx.Commit()
- Exectx = nil
- if err != nil {
- fmt.Println("Exectx.Commit()insertcount:", err)
- }
- insertlog_lasttime = time.Now() // get current time
- insertcount = 0
- }
- }
- }
- }
- }
- func Exec(mdb *sql.DB, _exsqlstr string) error {
- if mdb != nil {
- _, err := mdb.Exec(_exsqlstr)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func ExecT(mdb *sql.Tx, _exsqlstr string) error {
- if mdb != nil {
- _, err := mdb.Exec(_exsqlstr)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func Get(mdb *sql.DB) (int, string) {
- var id int
- var sql string
- sql = ""
- if mdb != nil {
- //参数绑定,可以避免sql注入
- rows := mdb.QueryRow("select `id`,`sql` from `tablesqllog` order by id limit 1")
- err := rows.Scan(&id, &sql)
- if err != nil {
- return 0, ""
- }
- }
- return id, sql
- }
- func delete(mdb *sql.DB, id int) error {
- if deletestmt == nil {
- deletestmt, _ = mdb.Prepare("delete from `tablesqllog` where id <= ? ")
- }
- if deletestmt != nil {
- _, err := deletestmt.Exec(id)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func GetDML(e *canal.RowsEvent) string {
- ColumnsOld := e.Table.Columns
- data := make(map[string]string, 0)
- if _, ok := tableField[e.Table.Name]; ok {
- data = tableField[e.Table.Name]
- }
- if !old {
- for _, v := range e.Table.Columns {
- if strings.Index(v.Name, "pmid") > -1 {
- return ""
- }
- }
- }
- updateMap := make(map[string]string, 0)
- deleteField := []string{}
- if e.Action == "insert" {
- if len(data) > 0 {
- Columns := make([]schema.TableColumn, 0)
- for _, v := range e.Table.Columns {
- if _, ok := data[v.Name]; ok {
- v.Name = data[v.Name]
- }
- Columns = append(Columns, v)
- }
- for datak, datav := range data {
- if datak == "deleteField" {
- continue
- }
- key := datak
- for _, v := range Columns {
- if key == v.Name || datav == v.Name {
- key = ""
- break
- }
- }
- if key != "" {
- Columns = append(Columns, schema.TableColumn{
- Name: key,
- })
- e.Rows[0] = append(e.Rows[0], datav)
- }
- }
- slist := []string{}
- for _, v := range Columns {
- slist = append(slist, v.Name)
- }
- if _, ok := data["deleteField"]; ok {
- deleteFields := strings.Split(data["deleteField"], ",")
- rowindexList := []int{}
- for _, field := range deleteFields {
- index := -1
- for k, v := range Columns {
- if v.Name == field {
- index = k
- rowindexList = append(rowindexList, index)
- break
- }
- }
- }
- // rows := e.Rows[0]
- var rowList [][]interface{}
- for _, row := range e.Rows {
- var rows []interface{}
- for i, r := range row {
- exist := false
- for _, index := range rowindexList {
- if index == i {
- exist = true
- break
- }
- }
- if !exist {
- rows = append(rows, r)
- }
- }
- rowList = append(rowList, rows)
- }
- e.Rows = rowList
- // for k, _ := range e.Rows {
- // e.Rows[k] = rows
- // }
- // for _, field := range rowindexList {
- var columnsList []schema.TableColumn
- for i, v := range Columns {
- exist := false
- for _, index := range rowindexList {
- if index == i {
- exist = true
- break
- }
- }
- if !exist {
- columnsList = append(columnsList, v)
- }
- }
- e.Table.Columns = columnsList
- // }
- // if _, ok := data["deleteField"]; ok {
- // rows := e.Rows[0]
- // deleteFields := strings.Split(data["deleteField"], ",")
- // index := -1
- // for k, v := range Columns {
- // if v.Name == field {
- // index = k
- // }
- // }
- // if index == 0 {
- // Columns = Columns[index+1:]
- // rows = rows[index+1:]
- // } else if index == len(Columns)-1 {
- // Columns = Columns[:index]
- // rows = rows[:index]
- // } else if index > 0 {
- // var arr1 []schema.TableColumn
- // arr1 = append(arr1, Columns[:index]...)
- // arr1 = append(arr1, Columns[index+1:]...)
- // Columns = arr1
- // var arr2 []interface{}
- // arr2 = append(arr2, rows[:index]...)
- // arr2 = append(arr2, rows[index+1:]...)
- // rows = arr2
- // }
- }
- // for k, _ := range e.Rows {
- // e.Rows[k] = rows
- // }
- }
- } else {
- for k, v := range data {
- if k != "deleteField" {
- updateMap[k] = v
- } else {
- deleteField = strings.Split(v, ",")
- }
- }
- }
- query := ""
- if e.Action == "insert" {
- Columns := GetColumns(e)
- var rows []string
- var ids []string
- var idindex int
- for i, column := range Columns {
- if column == "id" {
- idindex = i
- break
- }
- }
- for _, value := range e.Rows {
- exist := false
- var frows []string
- for index, fvalue := range value {
- switch v := fvalue.(type) {
- case string:
- frows = append(frows, "'"+v+"'")
- case int8, int16, int32, int, int64:
- strV := fmt.Sprintf("%d", v)
- frows = append(frows, strV)
- case decimal.Decimal:
- frows = append(frows, v.String())
- case float32:
- frows = append(frows, strconv.FormatFloat(float64(v), 'f', -1, 32))
- case float64:
- frows = append(frows, strconv.FormatFloat(v, 'f', -1, 64))
- default:
- frows = append(frows, "NULL")
- }
- if index == idindex {
- if len(frows) > 0 {
- for _, id := range ids {
- if id == frows[len(frows)-1] {
- exist = true
- break
- }
- }
- if !exist {
- ids = append(ids, frows[len(frows)-1])
- }
- }
- }
- }
- if exist {
- continue
- }
- if strings.Trim(nodename, " ") != "" {
- frows = append(frows, "'"+nodevalue+"'")
- }
- rows = append(rows, "("+strings.Join(frows, ",")+")")
- }
- query = fmt.Sprintf(Insert, e.Table.Name, strings.Join(Columns, ","), strings.Join(rows, ","))
- } else if e.Action == "update" {
- var rows []string
- var where []string
- var uset []string
- for i, value := range e.Rows {
- if (i % 2) == 0 {
- where = append([]string{})
- for _, pk := range e.Table.PKColumns {
- name := e.Table.Columns[pk].Name
- if !old {
- if _, ok := updateMap[e.Table.Columns[pk].Name]; ok {
- name = updateMap[e.Table.Columns[pk].Name]
- }
- }
- isExist := false
- for _, fname := range deleteField {
- if fname == name {
- isExist = true
- break
- }
- }
- if isExist {
- continue
- }
- switch v := value[pk].(type) {
- case string:
- where = append(where, "`"+name+"` = '"+v+"'")
- case int8, int16, int32, int, int64:
- strV := fmt.Sprintf("%d", v)
- where = append(where, "`"+name+"` = "+strV)
- case decimal.Decimal:
- where = append(where, "`"+name+"` = "+v.String())
- case float32:
- where = append(where, "`"+name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
- case float64:
- where = append(where, "`"+name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
- default:
- where = append(where, "`"+name+"` is NULL")
- }
- }
- if strings.Trim(nodename, " ") != "" {
- where = append(where, nodename+" = "+"'"+nodevalue+"'")
- }
- // isExist := false
- // for _, w := range where {
- // if strings.Index(w, "pastureid") >= 0 {
- // isExist = true
- // break
- // }
- // }
- // if old && !isExist {
- // where = append(where, " pastureid"+" = "+"'"+pastureid+"'")
- // } else if !isExist {
- // where = append(where, " pasturecode"+" = "+"'"+pastureid+"'")
- // }
- } else {
- uset = append([]string{})
- for j, _ := range e.Table.Columns {
- xx := reflect.DeepEqual(e.Rows[i-1][j], value[j])
- if !xx {
- name := e.Table.Columns[j].Name
- if !old {
- if _, ok := updateMap[e.Table.Columns[j].Name]; ok {
- name = updateMap[e.Table.Columns[j].Name]
- }
- }
- isExist := false
- for _, fname := range deleteField {
- if fname == name {
- isExist = true
- break
- }
- }
- if isExist {
- continue
- }
- switch v := value[j].(type) {
- case string:
- uset = append(uset, "`"+name+"` = '"+v+"'")
- case int8, int16, int32, int, int64:
- strV := fmt.Sprintf("%d", v)
- uset = append(uset, "`"+name+"` = "+strV)
- case decimal.Decimal:
- uset = append(uset, "`"+name+"` = "+v.String())
- case float32:
- uset = append(uset, "`"+name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
- case float64:
- uset = append(uset, "`"+name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
- default:
- uset = append(uset, "`"+name+"` = NULL")
- }
- }
- }
- if len(uset) == 0 {
- return ""
- }
- rows = append(rows, fmt.Sprintf(Update, e.Table.Name, strings.Join(uset, ", "), strings.Join(where, " and ")))
- }
- }
- query = strings.Join(rows, "\n")
- } else if e.Action == "delete" {
- var rows []string
- for _, value := range e.Rows {
- var where []string
- for _, pk := range e.Table.PKColumns {
- name := e.Table.Columns[pk].Name
- if !old {
- if _, ok := updateMap[e.Table.Columns[pk].Name]; ok {
- name = updateMap[e.Table.Columns[pk].Name]
- }
- }
- isExist := false
- for _, fname := range deleteField {
- if fname == name {
- isExist = true
- break
- }
- }
- if isExist {
- continue
- }
- switch v := value[pk].(type) {
- case string:
- where = append(where, "`"+e.Table.Columns[pk].Name+"` = '"+v+"'")
- case int8, int16, int32, int, int64:
- strV := fmt.Sprintf("%d", v)
- where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strV)
- case float32:
- where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
- case float64:
- where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
- default:
- where = append(where, "`"+e.Table.Columns[pk].Name+"` is NULL")
- }
- }
- if strings.Trim(nodename, " ") != "" {
- where = append(where, nodename+" = "+"'"+nodevalue+"'")
- }
- // isExist := false
- // for _, w := range where {
- // if strings.Index(w, "pastureid") >= 0 {
- // isExist = true
- // break
- // }
- // }
- // if !old {
- // where = append(where, " pastureid"+" = "+"'"+pastureid+"'")
- // } else if isExist {
- // where = append(where, " pasturecode"+" = "+"'"+pastureid+"'")
- // }
- rows = append(rows, fmt.Sprintf(Delete, e.Table.Name, strings.Join(where, " and ")))
- }
- query = strings.Join(rows, "\n")
- }
- e.Table.Columns = ColumnsOld
- return query
- }
- type handler struct {
- canal.DummyEventHandler
- }
- func OpenFile(filename string) (*os.File, error) {
- if _, err := os.Stat(filename); os.IsNotExist(err) {
- fmt.Println("文件不存在")
- return os.Create(filename) //创建文件
- }
- fmt.Println("文件存在")
- return os.OpenFile(filename, os.O_APPEND, 0666) //打开文件
- }
- func (h *handler) OnRow(e *canal.RowsEvent) error {
- needsave := 0
- if len(tables) > 0 && len(tableDB) > 0 {
- subs := strings.Split(tables, ",")
- for _, value := range subs {
- if strings.ToLower(e.Table.Name) == strings.ToLower(value) {
- needsave = 1
- break
- }
- }
- } else {
- needsave = 1
- }
- if needsave > 0 {
- s := GetDML(e)
- if len(s) > 0 {
- if showlog > 0 {
- // fmt.Println(s)
- }
- if strings.Index(s, "downloadedplan") > 0 {
- // fmt.Println(s)
- }
- //insertlog(s)
- //sqlDeque.PushBack(s)
- //mysqlQueue.Enqueue(s)
- //WriteChWithSelect(mysqlch, s)
- //savesqltofile(s)
- err := DqueueIndex.PushOneIndex([]byte(s))
- if err != nil {
- fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:", err)
- }
- }
- }
- return nil
- }
- func (h *handler) OnRotate(roateEvent *replication.RotateEvent) error {
- //writePosini(string(roateEvent.NextLogName), uint32(roateEvent.Position))
- setini(string(roateEvent.NextLogName), uint32(roateEvent.Position))
- return nil
- }
- func (h *handler) OnPosSynced(p mysql.Position, G mysql.GTIDSet, g bool) error {
- // fmt.Println(p.Name + " | " + strconv.Itoa(int(p.Pos)))
- endName = p.Name
- endPos = p.Pos
- //writePosini(p.Name, p.Pos)
- setini(p.Name, p.Pos)
- return nil
- }
- func (h *handler) String() string {
- return "TestHandler"
- }
- func Readini() {
- CurrentPath, _ = GetCurrentPath()
- h, _ := log.NewRotatingFileHandler(CurrentPath+"Consumer.log", 40*1024*1024, 10)
- logger = log.NewDefault(h)
- logger.SetLevelByName("info")
- inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
- if err != nil {
- logger.Errorln("读取配置文件失败1[config.ini]", err.Error())
- return
- }
- PosChan = make(chan Posname, 10000)
- go runsetini()
- host = inicfg.MustValue("canal", "host", "127.0.0.1")
- port = inicfg.MustInt("canal", "port", 3306)
- user = inicfg.MustValue("canal", "user", "root")
- password = inicfg.MustValue("canal", "password", "root")
- flavor = inicfg.MustValue("canal", "flavor", "mariadb")
- serverID = inicfg.MustInt("canal", "serverID", 101)
- mysqldump = inicfg.MustValue("canal", "mysqldump", "mysqldump")
- tables = inicfg.MustValue("canal", "tables", "")
- tableDB = inicfg.MustValue("canal", "tableDB", "")
- ignoreTables = inicfg.MustValue("canal", "ignoreTables", "")
- startName = inicfg.MustValue("canal", "startName", "")
- startPos = uint(inicfg.MustInt("canal", "startPos", 0))
- endName = startName
- endPos = uint32(startPos)
- heartbeatPeriod = inicfg.MustInt("canal", "heartbeatPeriod", 60)
- readTimeout = inicfg.MustInt("canal", "readTimeout", 90)
- nodename = inicfg.MustValue("canal", "nodename", "")
- nodevalue = inicfg.MustValue("canal", "nodevalue", "1")
- showlog = inicfg.MustInt("canal", "showlog", 1)
- onetimerows = inicfg.MustValue("canal", "onetimerows", "1000")
- KafkaEnable = inicfg.MustBool("kafka", "kafkaEnable", false)
- kafka_host = inicfg.MustValue("kafka", "kafka_host", "127.0.0.1")
- kafka_port = inicfg.MustInt("kafka", "kafka_port", 9092)
- kafka_topic = inicfg.MustValue("kafka", "kafka_topic", "kafka_go_test")
- saslEnable = inicfg.MustBool("kafka", "saslEnable", false)
- username = inicfg.MustValue("kafka", "username", "root")
- saslpassword = inicfg.MustValue("kafka", "saslpassword", "root")
- tlsEnable = inicfg.MustBool("kafka", "tlsEnable", false)
- clientcert = inicfg.MustValue("kafka", "clientcert", "")
- clientkey = inicfg.MustValue("kafka", "clientkey", "")
- cacert = inicfg.MustValue("kafka", "cacert", "")
- MqttEnable = inicfg.MustBool("mqtt", "mqttEnable", false)
- KptCattleId = inicfg.MustValue("mqtt", "kptCattleId", "888888")
- mqtt_nodevalue = nodevalue
- mqtt_host = inicfg.MustValue("mqtt", "host", "127.0.0.1")
- mqtt_port = inicfg.MustInt("mqtt", "port", 9092)
- mqtt_path = inicfg.MustValue("mqtt", "path", "")
- mqtt_qos = inicfg.MustInt("mqtt", "qos", 0)
- mqtt_topic = inicfg.MustValue("mqtt", "topic", "mqtt_topic")
- mqtt_saslEnable = inicfg.MustBool("mqtt", "saslEnable", false)
- mqtt_username = inicfg.MustValue("mqtt", "username", "127.0.0.1")
- mqtt_saslpassword = inicfg.MustValue("mqtt", "saslpassword", "")
- mqtt_tlsEnable = inicfg.MustBool("mqtt", "tlsEnable", false)
- mqtt_clientcert = inicfg.MustValue("mqtt", "clientcert", "")
- mqtt_clientkey = inicfg.MustValue("mqtt", "clientkey", "")
- mqtt_cacert = inicfg.MustValue("mqtt", "cacert", "")
- mqtt_CleanSession = inicfg.MustBool("mqtt", "CleanSession", true)
- old = inicfg.MustBool("mqtt", "old", true)
- pastureid = inicfg.MustValue("canal", "pastureid", "0")
- tableField = make(map[string]map[string]string, 0)
- for _, table := range strings.Split(tables, ",") {
- dataMap, _ := inicfg.GetSection(table)
- if dataMap != nil {
- tableField[table] = dataMap
- }
- }
- // fmt.Println(tableField)
- ServiceName = inicfg.MustValue("Service", "ServiceName", "KPTDataService")
- ServiceDisplayName = inicfg.MustValue("Service", "ServiceDisplayName", "KPTDataService")
- ServiceDescription = inicfg.MustValue("Service", "ServiceDescription", "科湃腾数据同步")
- openini()
- file := CurrentPath + "/pos/pos.db"
- defer slowpoke.Close(CurrentPath + "pos/pos.db")
- keyN := []byte("startName")
- keyP := []byte("startPos")
- resN, err := slowpoke.Get(file, keyN)
- if err != nil || string(resN) == "" {
- logger.Error(" slowpoke.Get startName err", err, " posname :", resN)
- } else {
- res, err := slowpoke.Get(file, keyP)
- if err != nil {
- logger.Error(" slowpoke.Get startPos err", err)
- } else {
- str, err := strconv.Atoi(string(res))
- if err != nil || str == 0 {
- logger.Error("strconv.Atoi err", err, " pos :", str)
- } else {
- startName = string(resN)
- startPos = uint(str)
- }
- logger.Info("read pos name ", startPos, startName)
- }
- }
- logger.Info("end pos name", startPos, startName)
- }
- func initPos(_user, _password, _host, _dbs string, _port int) (int, string, error) {
- connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "mysql")
- var err error
- posdb, err = sql.Open("mysql", connecting)
- if err == nil {
- posdb.SetMaxOpenConns(10)
- posdb.SetMaxIdleConns(10)
- } else {
- return 0, "", err
- }
- _, err = posdb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`")
- posdb.Close()
- connecting = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "sqllog")
- posdb, err = sql.Open("mysql", connecting)
- if err == nil {
- posdb.SetMaxOpenConns(10)
- posdb.SetMaxIdleConns(10)
- } else {
- return 0, "", err
- }
- _, _ = posdb.Exec("CREATE TABLE IF NOT EXISTS `mypos` (`id` INT(11) NOT NULL ,`pos` INT(11) DEFAULT 0,`file` VARCHAR(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=INNODB DEFAULT CHARSET=utf8;")
- rows := posdb.QueryRow("select pos,file from mypos where id = 0 ")
- pos := 0
- file := ""
- if err := rows.Scan(&pos, &file); err != nil {
- _, err = posdb.Exec("insert mypos(id,pos,file) value (0,0,'')")
- }
- return pos, file, err
- }
- func openini() {
- CurrentPath, _ = GetCurrentPath()
- var err error
- inicfg, err = goconfig.LoadConfigFile(CurrentPath + "config.ini")
- if err != nil {
- logger.Errorln("读取配置文件失败[config.ini]", err.Error())
- return
- }
- }
- func setini(name string, pos uint32) {
- posname := Posname{name, pos}
- PosChan <- posname
- }
- func runsetini() {
- for {
- posname := <-PosChan
- logger.Info("binlog pos :", posname.pos, " binlog posname :", posname.name)
- CurrentPath, _ = GetCurrentPath()
- inicfg.SetValue("canal", "startName", posname.name)
- inicfg.SetValue("canal", "startPos", strconv.Itoa(int(posname.pos)))
- file := CurrentPath + "pos/pos.db"
- err := slowpoke.Set(file, []byte("startName"), []byte(posname.name))
- if err != nil {
- logger.Error(" slowpoke.Set startName err", err)
- }
- err = slowpoke.Set(file, []byte("startPos"), []byte(strconv.Itoa(int(posname.pos))))
- if err != nil {
- logger.Error(" slowpoke.Set startPos err", err)
- }
- err = slowpoke.Close(CurrentPath + "pos/pos.db")
- if err != nil {
- logger.Error(" slowpoke.Close err", err)
- }
- }
- }
- //func saveini() {
- //
- //
- //}
- func writeini(name string, pos uint32) {
- CurrentPath, _ = GetCurrentPath()
- inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
- if err != nil {
- logger.Infoln("读取配置文件失败[config.ini]")
- return
- }
- inicfg.SetValue("canal", "startName", name)
- inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
- err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini")
- if err != nil {
- logger.Infoln("保存配置文件失败[config.ini]")
- return
- }
- }
- func writeini1(name string, pos uint32) {
- CurrentPath, _ = GetCurrentPath()
- inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
- if err != nil {
- logger.Infoln("读取配置文件失败[config.ini]")
- return
- }
- inicfg.SetValue("canal", "startName", name)
- inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
- err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini")
- if err != nil {
- logger.Infoln("保存配置文件失败[config.ini]")
- return
- }
- }
- func writePosini(name string, pos uint32) {
- println("pos", pos)
- tx, _ := posdb.Begin()
- _, err := tx.Exec("update mypos set pos = ? , file = ? where id = 0", pos, name)
- if err != nil {
- tx.Rollback()
- } else {
- tx.Commit()
- }
- //CurrentPath, _ = GetCurrentPath()
- //inicfg, err := goconfig.LoadConfigFile(CurrentPath + "configPos.ini")
- //if err != nil {
- // logger.Errorln("读取配置文件失败[configPos.ini]")
- // return
- //}
- //inicfg.SetValue("canal", "startName", name)
- //inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
- //err = goconfig.SaveConfigFile(inicfg, CurrentPath+"configPos.ini")
- //if err != nil {
- // logger.Errorln("保存配置文件失败[configPos.ini]")
- // return
- //}
- }
- func IntToBytes(n int) []byte {
- data := int64(n)
- bytebuf := bytes.NewBuffer([]byte{})
- binary.Write(bytebuf, binary.BigEndian, data)
- return bytebuf.Bytes()
- }
- func BytesToInt(bys []byte) int {
- bytebuff := bytes.NewBuffer(bys)
- var data int64
- binary.Read(bytebuff, binary.BigEndian, &data)
- return int(data)
- }
- func RunService() {
- //Readini()
- var err error
- DqueueIndex, err = OpenIndexFile(CurrentPath+"logs", 1000000, 10000, false)
- if err != nil {
- fmt.Printf("create DqueueIndex err %v", err)
- os.Exit(1)
- }
- cfg = canal.NewDefaultConfig()
- cfg.Addr = fmt.Sprintf("%s:%d", host, port)
- cfg.User = user
- cfg.Password = password
- cfg.Flavor = flavor
- cfg.UseDecimal = true
- cfg.SemiSyncEnabled = true
- // cfg.ReadTimeout = time.Duration * 10
- cfg.ReadTimeout = time.Duration(readTimeout) * time.Second
- cfg.HeartbeatPeriod = time.Duration(heartbeatPeriod) * time.Second
- cfg.ServerID = uint32(serverID)
- cfg.Dump.ExecutionPath = CurrentPath + mysqldump
- cfg.Dump.DiscardErr = false
- cfg.Dump.MaxAllowedPacketMB = 500
- //cfg.Dump.TableDB = "tmrwatch"
- c, err := canal.NewCanal(cfg)
- if err != nil {
- fmt.Printf("create canal err %v", err)
- os.Exit(1)
- }
- if len(ignoreTables) > 0 {
- subs := strings.Split(ignoreTables, ",")
- for _, sub := range subs {
- if seps := strings.Split(sub, "."); len(seps) == 2 {
- c.AddDumpIgnoreTables(seps[0], seps[1])
- }
- }
- }
- if len(tables) > 0 && len(tableDB) > 0 {
- subs := strings.Split(tables, ",")
- c.AddDumpTables(tableDB, subs...)
- } else if len(tableDB) > 0 {
- subs := strings.Split(tableDB, ",")
- c.AddDumpDatabases(subs...)
- }
- c.SetEventHandler(&handler{})
- startPos_ := mysql.Position{
- Name: startName,
- Pos: uint32(startPos),
- }
- ctx := context.Background()
- ctx, cancel := context.WithCancel(ctx)
- go func() {
- if !(startPos > 0) {
- initDB()
- //SqlBitcask.DeleteAll()
- //SqlBitcask.Merge()
- insertEmpty()
- }
- err = c.RunFrom(startPos_)
- if err != nil { //出错后,将重头开始上传
- fmt.Printf("start canal err %v", err)
- logger.Error("start canal err %v", err)
- c.Close()
- c, err := canal.NewCanal(cfg)
- if err != nil {
- fmt.Printf("create canal err %v", err)
- os.Exit(1)
- }
- if len(ignoreTables) > 0 {
- subs := strings.Split(ignoreTables, ",")
- for _, sub := range subs {
- if seps := strings.Split(sub, "."); len(seps) == 2 {
- c.AddDumpIgnoreTables(seps[0], seps[1])
- }
- }
- }
- if len(tables) > 0 && len(tableDB) > 0 {
- subs := strings.Split(tables, ",")
- c.AddDumpTables(tableDB, subs...)
- } else if len(tableDB) > 0 {
- subs := strings.Split(tableDB, ",")
- c.AddDumpDatabases(subs...)
- }
- c.SetEventHandler(&handler{})
- ctx := context.Background()
- ctx, cancel = context.WithCancel(ctx)
- startPos_ = mysql.Position{
- Name: "",
- Pos: 0,
- }
- // initDB()
- insertEmpty()
- err = c.RunFrom(startPos_)
- logger.Error("start canal err second %v", err)
- }
- }()
- // s := cron.New()
- // s.AddFunc("00 40 07 * * *", func() {
- // insertEmpty()
- // fmt.Printf("start canal err %v", err)
- // logger.Error("start canal err %v", err)
- // c.Close()
- // c, err := canal.NewCanal(cfg)
- // if err != nil {
- // fmt.Printf("create canal err %v", err)
- // os.Exit(1)
- // }
- // if len(ignoreTables) > 0 {
- // subs := strings.Split(ignoreTables, ",")
- // for _, sub := range subs {
- // if seps := strings.Split(sub, "."); len(seps) == 2 {
- // c.AddDumpIgnoreTables(seps[0], seps[1])
- // }
- // }
- // }
- // if len(tables) > 0 && len(tableDB) > 0 {
- // subs := strings.Split(tables, ",")
- // c.AddDumpTables(tableDB, subs...)
- // } else if len(tableDB) > 0 {
- // subs := strings.Split(tableDB, ",")
- // c.AddDumpDatabases(subs...)
- // }
- // c.SetEventHandler(&handler{})
- // ctx := context.Background()
- // ctx, cancel = context.WithCancel(ctx)
- // startPos_ = mysql.Position{
- // Name: "",
- // Pos: 0,
- // }
- // // initDB()
- // err = c.RunFrom(startPos_)
- // logger.Error("start canal err second %v", err)
- // })
- // s.Start()
- //监听
- //ctxListen := context.Background()
- //ctxListen, cancelListen := context.WithCancel(ctxListen)
- //go func() {
- // ticker := time.Tick(60 * time.Second)
- // for {
- // select {
- // case <-ticker:
- // if time.Since(insertlog_lasttime) > 60*time.Second && Exectx != nil {
- //
- // err := Exectx.Commit()
- // Exectx = nil
- // if err != nil {
- // fmt.Println("Exectx.Commit()ticker err:", err)
- // }
- // insertlog_lasttime = time.Now()
- // }
- // case <-ctxListen.Done():
- // if Exectx != nil {
- //
- // err := Exectx.Commit()
- // Exectx = nil
- // if err != nil {
- // fmt.Println("Exectx.Commit()ticker err:", err)
- // }
- // insertlog_lasttime = time.Now()
- // fmt.Println("Exectx.Commit()tickerEXIT:", err)
- // return
- // }
- //
- // }
- // }
- //
- //}()
- if KafkaEnable {
- go Kafka_producerDB(ctx)
- // go Kafka_producerXXWDB(ctx)
- } else if MqttEnable {
- go Mqtt_producerDB(ctx)
- }
- Ch = make(chan int, 1)
- <-Ch
- //cancelListen()
- cancel()
- DqueueIndex.CloseIndex()
- //setini(endName,endPos)
- //writeini(endName,endPos)
- fmt.Println("程序被关闭 ")
- c.Close()
- }
|