123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266 |
- package kpt
- import (
- "bytes"
- "context"
- "database/sql"
- "encoding/binary"
- "errors"
- "fmt"
- "os"
- "os/exec"
- "path/filepath"
- "reflect"
- "runtime"
- "strconv"
- "strings"
- "time"
- "github.com/Unknwon/goconfig"
- "github.com/go-mysql-org/go-mysql/canal"
- "github.com/go-mysql-org/go-mysql/mysql"
- "github.com/go-mysql-org/go-mysql/replication"
- "github.com/go-mysql-org/go-mysql/schema"
- _ "github.com/go-sql-driver/mysql" //导入mysql驱动包
- "github.com/recoilme/slowpoke"
- "github.com/robfig/cron"
- "github.com/shopspring/decimal"
- "github.com/siddontang/go-log/log"
- )
- var (
- host string //"MySQL host")
- port int //"MySQL port")
- user string // "MySQL user, must have replication privilege")
- password string // "MySQL password")
- flavor string // "Flavor: mysql or mariadb")
- serverID int // "Unique Server ID")
- mysqldump string // "mysqldump execution path")
- tables string // "dump tables, seperated by comma, will overwrite dbs")
- tableDB string // "database for dump tables")
- ignoreTables string // "ignore tables, must be database.table format, separated by comma")
- startName string // "start sync from binlog name")
- startPos uint // "start sync from binlog position of")
- endName string // "start sync from binlog name")
- endPos uint32 // "start sync from binlog position of")
- heartbeatPeriod int // "master heartbeat period")
- readTimeout int // "connection read timeout")
- nodename string //nodename key name
- nodevalue string //nodevalue value
- showlog int
- ServiceName string //服务显示名称
- ServiceDisplayName string //服务名称
- ServiceDescription string //服务描述
- KafkaEnable bool
- MqttEnable bool
- CurrentPath string
- mydb *sql.DB // 全局的数据库操作句柄
- posdb *sql.DB // 全局的数据库操作句柄
- inicfg *goconfig.ConfigFile
- PosChan chan Posname
- stmt *sql.Stmt
- deletestmt *sql.Stmt
- logger *log.Logger
- Ch chan int
- insertstmt string = ""
- insertcount int = 0
- insertcountv int = 0
- Exectx *sql.Tx
- insertlog_lasttime time.Time
- DqueueIndex *DQueue
- DqueueIndexMqtt *DQueue
- mqttDb *sql.DB
- KptCattleId string
- old bool
- tableField map[string]map[string]string
- pastureid string
- i int
- )
- const (
- Type = "mysql"
- Insert = `INSERT INTO %s(%s) VALUES%s;`
- Update = `UPDATE %s SET %s WHERE %s;`
- Delete = `DELETE FROM %s WHERE %s;`
- DeleteAll = `TRUNCATE TABLE %s`
- )
- type Posname struct {
- name string
- pos uint32
- }
- func ReadWithSelect(sqlch chan string) (x string, res bool) {
- select {
- case x = <-sqlch:
- return x, true
- default:
- return "", false
- }
- }
- func WriteChWithSelect(sqlch chan string, sql string) bool {
- timeout := time.NewTimer(time.Microsecond * 500)
- select {
- case sqlch <- sql:
- return true
- case <-timeout.C:
- {
- logger.Errorf("WriteChWithSelect 超时: %s", sql)
- fmt.Printf("WriteChWithSelect 超时: %s", sql)
- return false
- }
- }
- }
- func GetCurrentPath() (string, error) {
- file, err := exec.LookPath(os.Args[0])
- if err != nil {
- return "", err
- }
- path, err := filepath.Abs(file)
- if err != nil {
- return "", err
- }
- //fmt.Println("path111:", path)
- if runtime.GOOS == "windows" {
- path = strings.Replace(path, "\\", "/", -1)
- }
- //fmt.Println("path222:", path)
- i := strings.LastIndex(path, "/")
- if i < 0 {
- return "", errors.New(`Can't find "/" or "\".`)
- }
- //fmt.Println("path333:", path)
- return string(path[0 : i+1]), nil
- }
- func GetColumns(e *canal.RowsEvent) []string {
- var Columns []string
- for _, value := range e.Table.Columns {
- Columns = append(Columns, "`"+value.Name+"`")
- }
- if strings.Trim(nodename, " ") != "" {
- Columns = append(Columns, strings.Trim(nodename, " "))
- }
- return Columns
- }
- func initDB() {
- err := GetDbsConnect(user, password, host, port, "mysql")
- _, err = mydb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`")
- mydb.Close()
- err = GetDbsConnect(user, password, host, port, "sqllog")
- if err == nil {
- _, _ = mydb.Exec("CREATE TABLE `tablesqllog` (`id` int(11) NOT NULL AUTO_INCREMENT,`sql` text DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;")
- insertEmpty()
- }
- }
- func GetDbsConnect(_user, _password, _host string, _port int, _dbs string) error {
- connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs)
- var err error
- mydb, err = sql.Open("mysql", connecting)
- if err == nil {
- mydb.SetMaxOpenConns(10)
- mydb.SetMaxIdleConns(10)
- } else {
- return err
- }
- return nil
- }
- func GetmyDbsConnect(_user, _password, _host string, _port int, _dbs string) (*sql.DB, error) {
- connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs)
- mdb, err := sql.Open("mysql", connecting)
- if err == nil {
- mdb.SetMaxOpenConns(10)
- mdb.SetMaxIdleConns(10)
- } else {
- return nil, err
- }
- return mdb, nil
- }
- func insertEmpty() {
- if len(tables) > 0 && len(tableDB) > 0 {
- subs := strings.Split(tables, ",")
- for i := len(subs) - 1; i >= 0; i-- {
- var where []string
- where = append(where, " 1 = 1")
- if strings.Trim(nodename, " ") != "" {
- where = append(where, nodename+" = "+"'"+nodevalue+"'")
- }
- sqls := fmt.Sprintf(Delete, subs[i], strings.Join(where, " and "))
- //insertlog(sqls)
- //savesqltofile(sqls)
- err := DqueueIndex.PushOneIndex([]byte(sqls))
- if err != nil {
- fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:", err)
- }
- }
- //for _, value := range subs {
- //}
- }
- }
- //func () {
- // for i:= 0 ; i<100000000;i++ {
- // select {
- // case <-c2 :
- // fmt.Println("c2退出")
- // return
- // default:
- // err =dqueuePop.PushOneIndex(
- // []byte("a"+strconv.Itoa(i)),
- // )
- // if err != nil{
- // fmt.Println("dqueuePop.PushOneIndex err",err)
- //
- // }else {
- // fmt.Println("dqueuePop.PushOneIndex sucess","a"+strconv.Itoa(i))
- // }
- // //time.Sleep(1*time.Millisecond)
- // }
- // //err =dqueue.Push([][]byte{
- // // []byte("a"+strconv.Itoa(i)),
- // // []byte("b"+strconv.Itoa(i)),
- // //})
- // }
- //}
- func appendinsertsql(sqlstr string) {
- if insertcount == 0 {
- insertstmt = "insert into tablesqllog(`sql`) values(\"" + sqlstr + "\")"
- } else {
- insertstmt = insertstmt + ",(\"" + sqlstr + "\")"
- }
- insertcount++
- }
- func insertlog(sqlstr string) {
- var err error
- if mydb == nil {
- connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", user, password, host, port, "sqllog")
- mydb, err = sql.Open("mysql", connecting)
- if err != nil {
- fmt.Println(" sql.Open err", err)
- }
- mydb.SetMaxOpenConns(100)
- mydb.SetMaxIdleConns(10)
- mydb.Ping()
- }
- if Exectx == nil {
- Exectx, err = mydb.Begin()
- if err != nil {
- fmt.Println(err)
- }
- }
- if Exectx != nil {
- _, err = Exectx.Exec("insert into tablesqllog(`sql`) values(?);", sqlstr)
- if err != nil {
- fmt.Println("Exectx.Exec(", err)
- } else {
- insertcount++
- if time.Since(insertlog_lasttime) > 60*time.Second {
- err := Exectx.Commit()
- Exectx = nil
- if err != nil {
- fmt.Println("Exectx.Commit():", err)
- }
- insertlog_lasttime = time.Now() // get current time
- } else {
- if insertcount > 10000 {
- err := Exectx.Commit()
- Exectx = nil
- if err != nil {
- fmt.Println("Exectx.Commit()insertcount:", err)
- }
- insertlog_lasttime = time.Now() // get current time
- insertcount = 0
- }
- }
- }
- }
- }
- func Exec(mdb *sql.DB, _exsqlstr string) error {
- if mdb != nil {
- _, err := mdb.Exec(_exsqlstr)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func ExecT(mdb *sql.Tx, _exsqlstr string) error {
- if mdb != nil {
- _, err := mdb.Exec(_exsqlstr)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func Get(mdb *sql.DB) (int, string) {
- var id int
- var sql string
- sql = ""
- if mdb != nil {
- //参数绑定,可以避免sql注入
- rows := mdb.QueryRow("select `id`,`sql` from `tablesqllog` order by id limit 1")
- err := rows.Scan(&id, &sql)
- if err != nil {
- return 0, ""
- }
- }
- return id, sql
- }
- func delete(mdb *sql.DB, id int) error {
- if deletestmt == nil {
- deletestmt, _ = mdb.Prepare("delete from `tablesqllog` where id <= ? ")
- }
- if deletestmt != nil {
- _, err := deletestmt.Exec(id)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func GetDML(e *canal.RowsEvent) string {
- ColumnsOld := e.Table.Columns
- data := make(map[string]string, 0)
- if _, ok := tableField[e.Table.Name]; ok {
- data = tableField[e.Table.Name]
- }
- if !old {
- for _, v := range e.Table.Columns {
- if strings.Index(v.Name, "pmid") > -1 {
- return ""
- }
- }
- }
- updateMap := make(map[string]string, 0)
- deleteField := []string{}
- if e.Action == "insert" {
- fmt.Println(e.Action, e.Table.Name)
- if e.Table.Name == "downloadedplan" {
- fmt.Println(e.Table.Name)
- }
- if len(data) > 0 {
- Columns := make([]schema.TableColumn, 0)
- for _, v := range e.Table.Columns {
- if _, ok := data[v.Name]; ok {
- v.Name = data[v.Name]
- }
- Columns = append(Columns, v)
- }
- for datak, datav := range data {
- if datak == "deleteField" {
- continue
- }
- key := datak
- for _, v := range Columns {
- if key == v.Name || datav == v.Name {
- key = ""
- break
- }
- }
- if key != "" {
- Columns = append(Columns, schema.TableColumn{
- Name: key,
- })
- e.Rows[0] = append(e.Rows[0], datav)
- }
- }
- slist := []string{}
- for _, v := range Columns {
- slist = append(slist, v.Name)
- }
- if _, ok := data["deleteField"]; ok {
- // rows := e.Rows[0]
- var rowList [][]interface{}
- for _, row := range e.Rows {
- rows := row
- if e.Table.Name == "downloadedplan" {
- fmt.Println(len(rows), 1)
- }
- deleteFields := strings.Split(data["deleteField"], ",")
- for _, field := range deleteFields {
- index := -1
- for k, v := range Columns {
- if v.Name == field {
- index = k
- }
- }
- if index == 0 {
- Columns = Columns[index+1:]
- rows = rows[index+1:]
- } else if index == len(Columns)-1 {
- Columns = Columns[:index]
- rows = rows[:index]
- } else if index > 0 {
- var arr1 []schema.TableColumn
- arr1 = append(arr1, Columns[:index]...)
- arr1 = append(arr1, Columns[index+1:]...)
- Columns = arr1
- var arr2 []interface{}
- arr2 = append(arr2, rows[:index]...)
- arr2 = append(arr2, rows[index+1:]...)
- rows = arr2
- }
- if e.Table.Name == "downloadedplan" {
- fmt.Println(len(rows), 2)
- }
- rowList = append(rowList, rows)
- }
- }
- e.Rows = rowList
- // for k, _ := range e.Rows {
- // e.Rows[k] = rows
- // }
- }
- e.Table.Columns = Columns
- }
- } else {
- for k, v := range data {
- if k != "deleteField" {
- updateMap[k] = v
- } else {
- deleteField = strings.Split(v, ",")
- }
- }
- }
- query := ""
- if e.Action == "insert" {
- Columns := GetColumns(e)
- var rows []string
- var ids []string
- var idindex int
- for i, column := range Columns {
- if column == "id" {
- idindex = i
- break
- }
- }
- for _, value := range e.Rows {
- exist := false
- var frows []string
- for index, fvalue := range value {
- switch v := fvalue.(type) {
- case string:
- frows = append(frows, "'"+v+"'")
- case int8, int16, int32, int, int64:
- strV := fmt.Sprintf("%d", v)
- frows = append(frows, strV)
- case decimal.Decimal:
- frows = append(frows, v.String())
- case float32:
- frows = append(frows, strconv.FormatFloat(float64(v), 'f', -1, 32))
- case float64:
- frows = append(frows, strconv.FormatFloat(v, 'f', -1, 64))
- default:
- frows = append(frows, "NULL")
- }
- if index == idindex {
- if len(frows) > 0 {
- for _, id := range ids {
- if id == frows[len(frows)-1] {
- exist = true
- break
- }
- }
- if !exist {
- ids = append(ids, frows[len(frows)-1])
- }
- }
- }
- }
- if exist {
- continue
- }
- if strings.Trim(nodename, " ") != "" {
- frows = append(frows, "'"+nodevalue+"'")
- }
- rows = append(rows, "("+strings.Join(frows, ",")+")")
- }
- query = fmt.Sprintf(Insert, e.Table.Name, strings.Join(Columns, ","), strings.Join(rows, ","))
- } else if e.Action == "update" {
- var rows []string
- var where []string
- var uset []string
- for i, value := range e.Rows {
- if (i % 2) == 0 {
- where = append([]string{})
- for _, pk := range e.Table.PKColumns {
- name := e.Table.Columns[pk].Name
- if !old {
- if _, ok := updateMap[e.Table.Columns[pk].Name]; ok {
- name = updateMap[e.Table.Columns[pk].Name]
- }
- }
- isExist := false
- for _, fname := range deleteField {
- if fname == name {
- isExist = true
- break
- }
- }
- if isExist {
- continue
- }
- switch v := value[pk].(type) {
- case string:
- where = append(where, "`"+name+"` = '"+v+"'")
- case int8, int16, int32, int, int64:
- strV := fmt.Sprintf("%d", v)
- where = append(where, "`"+name+"` = "+strV)
- case decimal.Decimal:
- where = append(where, "`"+name+"` = "+v.String())
- case float32:
- where = append(where, "`"+name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
- case float64:
- where = append(where, "`"+name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
- default:
- where = append(where, "`"+name+"` is NULL")
- }
- }
- if strings.Trim(nodename, " ") != "" {
- where = append(where, nodename+" = "+"'"+nodevalue+"'")
- }
- // isExist := false
- // for _, w := range where {
- // if strings.Index(w, "pastureid") >= 0 {
- // isExist = true
- // break
- // }
- // }
- // if old && !isExist {
- // where = append(where, " pastureid"+" = "+"'"+pastureid+"'")
- // } else if !isExist {
- // where = append(where, " pasturecode"+" = "+"'"+pastureid+"'")
- // }
- } else {
- uset = append([]string{})
- for j, _ := range e.Table.Columns {
- xx := reflect.DeepEqual(e.Rows[i-1][j], value[j])
- if !xx {
- name := e.Table.Columns[j].Name
- if !old {
- if _, ok := updateMap[e.Table.Columns[j].Name]; ok {
- name = updateMap[e.Table.Columns[j].Name]
- }
- }
- isExist := false
- for _, fname := range deleteField {
- if fname == name {
- isExist = true
- break
- }
- }
- if isExist {
- continue
- }
- switch v := value[j].(type) {
- case string:
- uset = append(uset, "`"+name+"` = '"+v+"'")
- case int8, int16, int32, int, int64:
- strV := fmt.Sprintf("%d", v)
- uset = append(uset, "`"+name+"` = "+strV)
- case decimal.Decimal:
- uset = append(uset, "`"+name+"` = "+v.String())
- case float32:
- uset = append(uset, "`"+name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
- case float64:
- uset = append(uset, "`"+name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
- default:
- uset = append(uset, "`"+name+"` = NULL")
- }
- }
- }
- if len(uset) == 0 {
- return ""
- }
- rows = append(rows, fmt.Sprintf(Update, e.Table.Name, strings.Join(uset, ", "), strings.Join(where, " and ")))
- }
- }
- query = strings.Join(rows, "\n")
- } else if e.Action == "delete" {
- var rows []string
- for _, value := range e.Rows {
- var where []string
- for _, pk := range e.Table.PKColumns {
- name := e.Table.Columns[pk].Name
- if !old {
- if _, ok := updateMap[e.Table.Columns[pk].Name]; ok {
- name = updateMap[e.Table.Columns[pk].Name]
- }
- }
- isExist := false
- for _, fname := range deleteField {
- if fname == name {
- isExist = true
- break
- }
- }
- if isExist {
- continue
- }
- switch v := value[pk].(type) {
- case string:
- where = append(where, "`"+e.Table.Columns[pk].Name+"` = '"+v+"'")
- case int8, int16, int32, int, int64:
- strV := fmt.Sprintf("%d", v)
- where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strV)
- case float32:
- where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
- case float64:
- where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
- default:
- where = append(where, "`"+e.Table.Columns[pk].Name+"` is NULL")
- }
- }
- if strings.Trim(nodename, " ") != "" {
- where = append(where, nodename+" = "+"'"+nodevalue+"'")
- }
- // isExist := false
- // for _, w := range where {
- // if strings.Index(w, "pastureid") >= 0 {
- // isExist = true
- // break
- // }
- // }
- // if !old {
- // where = append(where, " pastureid"+" = "+"'"+pastureid+"'")
- // } else if isExist {
- // where = append(where, " pasturecode"+" = "+"'"+pastureid+"'")
- // }
- rows = append(rows, fmt.Sprintf(Delete, e.Table.Name, strings.Join(where, " and ")))
- }
- query = strings.Join(rows, "\n")
- }
- e.Table.Columns = ColumnsOld
- return query
- }
- type handler struct {
- canal.DummyEventHandler
- }
- func OpenFile(filename string) (*os.File, error) {
- if _, err := os.Stat(filename); os.IsNotExist(err) {
- fmt.Println("文件不存在")
- return os.Create(filename) //创建文件
- }
- fmt.Println("文件存在")
- return os.OpenFile(filename, os.O_APPEND, 0666) //打开文件
- }
- func (h *handler) OnRow(e *canal.RowsEvent) error {
- needsave := 0
- if len(tables) > 0 && len(tableDB) > 0 {
- subs := strings.Split(tables, ",")
- for _, value := range subs {
- if strings.ToLower(e.Table.Name) == strings.ToLower(value) {
- needsave = 1
- break
- }
- }
- } else {
- needsave = 1
- }
- if needsave > 0 {
- s := GetDML(e)
- if len(s) > 0 {
- if showlog > 0 {
- fmt.Println(s)
- }
- if strings.Index(s, "downloadedplan") > 0 {
- fmt.Println(s)
- }
- //insertlog(s)
- //sqlDeque.PushBack(s)
- //mysqlQueue.Enqueue(s)
- //WriteChWithSelect(mysqlch, s)
- //savesqltofile(s)
- err := DqueueIndex.PushOneIndex([]byte(s))
- if err != nil {
- fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:", err)
- }
- }
- }
- return nil
- }
- func (h *handler) OnRotate(roateEvent *replication.RotateEvent) error {
- //writePosini(string(roateEvent.NextLogName), uint32(roateEvent.Position))
- setini(string(roateEvent.NextLogName), uint32(roateEvent.Position))
- return nil
- }
- func (h *handler) OnPosSynced(p mysql.Position, G mysql.GTIDSet, g bool) error {
- // fmt.Println(p.Name + " | " + strconv.Itoa(int(p.Pos)))
- endName = p.Name
- endPos = p.Pos
- //writePosini(p.Name, p.Pos)
- setini(p.Name, p.Pos)
- return nil
- }
- func (h *handler) String() string {
- return "TestHandler"
- }
- func Readini() {
- CurrentPath, _ = GetCurrentPath()
- h, _ := log.NewRotatingFileHandler(CurrentPath+"Consumer.log", 40*1024*1024, 10)
- logger = log.NewDefault(h)
- logger.SetLevelByName("info")
- inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
- if err != nil {
- logger.Errorln("读取配置文件失败1[config.ini]", err.Error())
- return
- }
- PosChan = make(chan Posname, 10000)
- go runsetini()
- host = inicfg.MustValue("canal", "host", "127.0.0.1")
- port = inicfg.MustInt("canal", "port", 3306)
- user = inicfg.MustValue("canal", "user", "root")
- password = inicfg.MustValue("canal", "password", "root")
- flavor = inicfg.MustValue("canal", "flavor", "mariadb")
- serverID = inicfg.MustInt("canal", "serverID", 101)
- mysqldump = inicfg.MustValue("canal", "mysqldump", "mysqldump")
- tables = inicfg.MustValue("canal", "tables", "")
- tableDB = inicfg.MustValue("canal", "tableDB", "")
- ignoreTables = inicfg.MustValue("canal", "ignoreTables", "")
- startName = inicfg.MustValue("canal", "startName", "")
- startPos = uint(inicfg.MustInt("canal", "startPos", 0))
- endName = startName
- endPos = uint32(startPos)
- heartbeatPeriod = inicfg.MustInt("canal", "heartbeatPeriod", 60)
- readTimeout = inicfg.MustInt("canal", "readTimeout", 90)
- nodename = inicfg.MustValue("canal", "nodename", "")
- nodevalue = inicfg.MustValue("canal", "nodevalue", "1")
- showlog = inicfg.MustInt("canal", "showlog", 1)
- onetimerows = inicfg.MustValue("canal", "onetimerows", "1000")
- KafkaEnable = inicfg.MustBool("kafka", "kafkaEnable", false)
- kafka_host = inicfg.MustValue("kafka", "kafka_host", "127.0.0.1")
- kafka_port = inicfg.MustInt("kafka", "kafka_port", 9092)
- kafka_topic = inicfg.MustValue("kafka", "kafka_topic", "kafka_go_test")
- saslEnable = inicfg.MustBool("kafka", "saslEnable", false)
- username = inicfg.MustValue("kafka", "username", "root")
- saslpassword = inicfg.MustValue("kafka", "saslpassword", "root")
- tlsEnable = inicfg.MustBool("kafka", "tlsEnable", false)
- clientcert = inicfg.MustValue("kafka", "clientcert", "")
- clientkey = inicfg.MustValue("kafka", "clientkey", "")
- cacert = inicfg.MustValue("kafka", "cacert", "")
- MqttEnable = inicfg.MustBool("mqtt", "mqttEnable", false)
- KptCattleId = inicfg.MustValue("mqtt", "kptCattleId", "888888")
- mqtt_nodevalue = nodevalue
- mqtt_host = inicfg.MustValue("mqtt", "host", "127.0.0.1")
- mqtt_port = inicfg.MustInt("mqtt", "port", 9092)
- mqtt_path = inicfg.MustValue("mqtt", "path", "")
- mqtt_qos = inicfg.MustInt("mqtt", "qos", 0)
- mqtt_topic = inicfg.MustValue("mqtt", "topic", "mqtt_topic")
- mqtt_saslEnable = inicfg.MustBool("mqtt", "saslEnable", false)
- mqtt_username = inicfg.MustValue("mqtt", "username", "127.0.0.1")
- mqtt_saslpassword = inicfg.MustValue("mqtt", "saslpassword", "")
- mqtt_tlsEnable = inicfg.MustBool("mqtt", "tlsEnable", false)
- mqtt_clientcert = inicfg.MustValue("mqtt", "clientcert", "")
- mqtt_clientkey = inicfg.MustValue("mqtt", "clientkey", "")
- mqtt_cacert = inicfg.MustValue("mqtt", "cacert", "")
- mqtt_CleanSession = inicfg.MustBool("mqtt", "CleanSession", true)
- old = inicfg.MustBool("mqtt", "old", true)
- pastureid = inicfg.MustValue("canal", "pastureid", "0")
- tableField = make(map[string]map[string]string, 0)
- for _, table := range strings.Split(tables, ",") {
- dataMap, _ := inicfg.GetSection(table)
- if dataMap != nil {
- tableField[table] = dataMap
- }
- }
- // fmt.Println(tableField)
- ServiceName = inicfg.MustValue("Service", "ServiceName", "KPTDataService")
- ServiceDisplayName = inicfg.MustValue("Service", "ServiceDisplayName", "KPTDataService")
- ServiceDescription = inicfg.MustValue("Service", "ServiceDescription", "科湃腾数据同步")
- openini()
- file := CurrentPath + "/pos/pos.db"
- defer slowpoke.Close(CurrentPath + "pos/pos.db")
- keyN := []byte("startName")
- keyP := []byte("startPos")
- resN, err := slowpoke.Get(file, keyN)
- if err != nil || string(resN) == "" {
- logger.Error(" slowpoke.Get startName err", err, " posname :", resN)
- } else {
- res, err := slowpoke.Get(file, keyP)
- if err != nil {
- logger.Error(" slowpoke.Get startPos err", err)
- } else {
- str, err := strconv.Atoi(string(res))
- if err != nil || str == 0 {
- logger.Error("strconv.Atoi err", err, " pos :", str)
- } else {
- startName = string(resN)
- startPos = uint(str)
- }
- logger.Info("read pos name ", startPos, startName)
- }
- }
- logger.Info("end pos name", startPos, startName)
- }
- func initPos(_user, _password, _host, _dbs string, _port int) (int, string, error) {
- connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "mysql")
- var err error
- posdb, err = sql.Open("mysql", connecting)
- if err == nil {
- posdb.SetMaxOpenConns(10)
- posdb.SetMaxIdleConns(10)
- } else {
- return 0, "", err
- }
- _, err = posdb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`")
- posdb.Close()
- connecting = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "sqllog")
- posdb, err = sql.Open("mysql", connecting)
- if err == nil {
- posdb.SetMaxOpenConns(10)
- posdb.SetMaxIdleConns(10)
- } else {
- return 0, "", err
- }
- _, _ = posdb.Exec("CREATE TABLE IF NOT EXISTS `mypos` (`id` INT(11) NOT NULL ,`pos` INT(11) DEFAULT 0,`file` VARCHAR(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=INNODB DEFAULT CHARSET=utf8;")
- rows := posdb.QueryRow("select pos,file from mypos where id = 0 ")
- pos := 0
- file := ""
- if err := rows.Scan(&pos, &file); err != nil {
- _, err = posdb.Exec("insert mypos(id,pos,file) value (0,0,'')")
- }
- return pos, file, err
- }
- func openini() {
- CurrentPath, _ = GetCurrentPath()
- var err error
- inicfg, err = goconfig.LoadConfigFile(CurrentPath + "config.ini")
- if err != nil {
- logger.Errorln("读取配置文件失败[config.ini]", err.Error())
- return
- }
- }
- func setini(name string, pos uint32) {
- posname := Posname{name, pos}
- PosChan <- posname
- }
- func runsetini() {
- for {
- posname := <-PosChan
- logger.Info("binlog pos :", posname.pos, " binlog posname :", posname.name)
- CurrentPath, _ = GetCurrentPath()
- inicfg.SetValue("canal", "startName", posname.name)
- inicfg.SetValue("canal", "startPos", strconv.Itoa(int(posname.pos)))
- file := CurrentPath + "pos/pos.db"
- err := slowpoke.Set(file, []byte("startName"), []byte(posname.name))
- if err != nil {
- logger.Error(" slowpoke.Set startName err", err)
- }
- err = slowpoke.Set(file, []byte("startPos"), []byte(strconv.Itoa(int(posname.pos))))
- if err != nil {
- logger.Error(" slowpoke.Set startPos err", err)
- }
- err = slowpoke.Close(CurrentPath + "pos/pos.db")
- if err != nil {
- logger.Error(" slowpoke.Close err", err)
- }
- }
- }
- //func saveini() {
- //
- //
- //}
- func writeini(name string, pos uint32) {
- CurrentPath, _ = GetCurrentPath()
- inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
- if err != nil {
- logger.Infoln("读取配置文件失败[config.ini]")
- return
- }
- inicfg.SetValue("canal", "startName", name)
- inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
- err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini")
- if err != nil {
- logger.Infoln("保存配置文件失败[config.ini]")
- return
- }
- }
- func writeini1(name string, pos uint32) {
- CurrentPath, _ = GetCurrentPath()
- inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
- if err != nil {
- logger.Infoln("读取配置文件失败[config.ini]")
- return
- }
- inicfg.SetValue("canal", "startName", name)
- inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
- err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini")
- if err != nil {
- logger.Infoln("保存配置文件失败[config.ini]")
- return
- }
- }
- func writePosini(name string, pos uint32) {
- println("pos", pos)
- tx, _ := posdb.Begin()
- _, err := tx.Exec("update mypos set pos = ? , file = ? where id = 0", pos, name)
- if err != nil {
- tx.Rollback()
- } else {
- tx.Commit()
- }
- //CurrentPath, _ = GetCurrentPath()
- //inicfg, err := goconfig.LoadConfigFile(CurrentPath + "configPos.ini")
- //if err != nil {
- // logger.Errorln("读取配置文件失败[configPos.ini]")
- // return
- //}
- //inicfg.SetValue("canal", "startName", name)
- //inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
- //err = goconfig.SaveConfigFile(inicfg, CurrentPath+"configPos.ini")
- //if err != nil {
- // logger.Errorln("保存配置文件失败[configPos.ini]")
- // return
- //}
- }
- func IntToBytes(n int) []byte {
- data := int64(n)
- bytebuf := bytes.NewBuffer([]byte{})
- binary.Write(bytebuf, binary.BigEndian, data)
- return bytebuf.Bytes()
- }
- func BytesToInt(bys []byte) int {
- bytebuff := bytes.NewBuffer(bys)
- var data int64
- binary.Read(bytebuff, binary.BigEndian, &data)
- return int(data)
- }
- func RunService() {
- //Readini()
- var err error
- DqueueIndex, err = OpenIndexFile(CurrentPath+"logs", 100000, 10000, false)
- if err != nil {
- fmt.Printf("create DqueueIndex err %v", err)
- os.Exit(1)
- }
- cfg := canal.NewDefaultConfig()
- cfg.Addr = fmt.Sprintf("%s:%d", host, port)
- cfg.User = user
- cfg.Password = password
- cfg.Flavor = flavor
- cfg.UseDecimal = true
- cfg.SemiSyncEnabled = true
- // cfg.ReadTimeout = time.Duration * 10
- cfg.ReadTimeout = time.Duration(readTimeout) * time.Second
- cfg.HeartbeatPeriod = time.Duration(heartbeatPeriod) * time.Second
- cfg.ServerID = uint32(serverID)
- cfg.Dump.ExecutionPath = CurrentPath + mysqldump
- cfg.Dump.DiscardErr = false
- cfg.Dump.MaxAllowedPacketMB = 500
- //cfg.Dump.TableDB = "tmrwatch"
- c, err := canal.NewCanal(cfg)
- if err != nil {
- fmt.Printf("create canal err %v", err)
- os.Exit(1)
- }
- if len(ignoreTables) > 0 {
- subs := strings.Split(ignoreTables, ",")
- for _, sub := range subs {
- if seps := strings.Split(sub, "."); len(seps) == 2 {
- c.AddDumpIgnoreTables(seps[0], seps[1])
- }
- }
- }
- if len(tables) > 0 && len(tableDB) > 0 {
- subs := strings.Split(tables, ",")
- c.AddDumpTables(tableDB, subs...)
- } else if len(tableDB) > 0 {
- subs := strings.Split(tableDB, ",")
- c.AddDumpDatabases(subs...)
- }
- c.SetEventHandler(&handler{})
- startPos_ := mysql.Position{
- Name: startName,
- Pos: uint32(startPos),
- }
- ctx := context.Background()
- ctx, cancel := context.WithCancel(ctx)
- go func() {
- if !(startPos > 0) {
- //initDB()
- //SqlBitcask.DeleteAll()
- //SqlBitcask.Merge()
- insertEmpty()
- }
- err = c.RunFrom(startPos_)
- if err != nil { //出错后,将重头开始上传
- fmt.Printf("start canal err %v", err)
- logger.Error("start canal err %v", err)
- c.Close()
- c, err := canal.NewCanal(cfg)
- if err != nil {
- fmt.Printf("create canal err %v", err)
- os.Exit(1)
- }
- if len(ignoreTables) > 0 {
- subs := strings.Split(ignoreTables, ",")
- for _, sub := range subs {
- if seps := strings.Split(sub, "."); len(seps) == 2 {
- c.AddDumpIgnoreTables(seps[0], seps[1])
- }
- }
- }
- if len(tables) > 0 && len(tableDB) > 0 {
- subs := strings.Split(tables, ",")
- c.AddDumpTables(tableDB, subs...)
- } else if len(tableDB) > 0 {
- subs := strings.Split(tableDB, ",")
- c.AddDumpDatabases(subs...)
- }
- c.SetEventHandler(&handler{})
- ctx := context.Background()
- ctx, cancel = context.WithCancel(ctx)
- startPos_ = mysql.Position{
- Name: "",
- Pos: 0,
- }
- // initDB()
- insertEmpty()
- err = c.RunFrom(startPos_)
- logger.Error("start canal err second %v", err)
- }
- }()
- // s := cron.New()
- // s.AddFunc("00 40 07 * * *", func() {
- // insertEmpty()
- // fmt.Printf("start canal err %v", err)
- // logger.Error("start canal err %v", err)
- // c.Close()
- // c, err := canal.NewCanal(cfg)
- // if err != nil {
- // fmt.Printf("create canal err %v", err)
- // os.Exit(1)
- // }
- // if len(ignoreTables) > 0 {
- // subs := strings.Split(ignoreTables, ",")
- // for _, sub := range subs {
- // if seps := strings.Split(sub, "."); len(seps) == 2 {
- // c.AddDumpIgnoreTables(seps[0], seps[1])
- // }
- // }
- // }
- // if len(tables) > 0 && len(tableDB) > 0 {
- // subs := strings.Split(tables, ",")
- // c.AddDumpTables(tableDB, subs...)
- // } else if len(tableDB) > 0 {
- // subs := strings.Split(tableDB, ",")
- // c.AddDumpDatabases(subs...)
- // }
- // c.SetEventHandler(&handler{})
- // ctx := context.Background()
- // ctx, cancel = context.WithCancel(ctx)
- // startPos_ = mysql.Position{
- // Name: "",
- // Pos: 0,
- // }
- // // initDB()
- // err = c.RunFrom(startPos_)
- // logger.Error("start canal err second %v", err)
- // })
- // s.Start()
- //监听
- //ctxListen := context.Background()
- //ctxListen, cancelListen := context.WithCancel(ctxListen)
- //go func() {
- // ticker := time.Tick(60 * time.Second)
- // for {
- // select {
- // case <-ticker:
- // if time.Since(insertlog_lasttime) > 60*time.Second && Exectx != nil {
- //
- // err := Exectx.Commit()
- // Exectx = nil
- // if err != nil {
- // fmt.Println("Exectx.Commit()ticker err:", err)
- // }
- // insertlog_lasttime = time.Now()
- // }
- // case <-ctxListen.Done():
- // if Exectx != nil {
- //
- // err := Exectx.Commit()
- // Exectx = nil
- // if err != nil {
- // fmt.Println("Exectx.Commit()ticker err:", err)
- // }
- // insertlog_lasttime = time.Now()
- // fmt.Println("Exectx.Commit()tickerEXIT:", err)
- // return
- // }
- //
- // }
- // }
- //
- //}()
- if KafkaEnable {
- go Kafka_producerDB(ctx)
- } else if MqttEnable {
- go Mqtt_producerDB(ctx)
- }
- Ch = make(chan int, 1)
- <-Ch
- //cancelListen()
- cancel()
- DqueueIndex.CloseIndex()
- //setini(endName,endPos)
- //writeini(endName,endPos)
- fmt.Println("程序被关闭 ")
- c.Close()
- }
- func NewCronWithBeijingLocation() *cron.Cron {
- loc, err := time.LoadLocation("Asia/Shanghai")
- if err != nil {
- panic(err)
- }
- return cron.NewWithLocation(loc)
- }
- if _, ok := data["deleteField"]; ok {
- rows := e.Rows[0]
- deleteFields := strings.Split(data["deleteField"], ",")
- for _, field := range deleteFields {
- index := -1
- for k, v := range Columns {
- if v.Name == field {
- index = k
- }
- }
- if index == 0 {
- Columns = Columns[index+1:]
- rows = rows[index+1:]
- } else if index == len(Columns)-1 {
- Columns = Columns[:index]
- rows = rows[:index]
- } else if index > 0 {
- var arr1 []schema.TableColumn
- arr1 = append(arr1, Columns[:index]...)
- arr1 = append(arr1, Columns[index+1:]...)
- Columns = arr1
- var arr2 []interface{}
- arr2 = append(arr2, rows[:index]...)
- arr2 = append(arr2, rows[index+1:]...)
- rows = arr2
- }
- }
- for k, _ := range e.Rows {
- e.Rows[k] = rows
- }
- }
|