gocanal.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264
  1. package kpt
  2. import (
  3. "bytes"
  4. "context"
  5. "database/sql"
  6. "encoding/binary"
  7. "errors"
  8. "fmt"
  9. "os"
  10. "os/exec"
  11. "path/filepath"
  12. "reflect"
  13. "runtime"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "github.com/Unknwon/goconfig"
  18. "github.com/go-mysql-org/go-mysql/canal"
  19. "github.com/go-mysql-org/go-mysql/mysql"
  20. "github.com/go-mysql-org/go-mysql/replication"
  21. "github.com/go-mysql-org/go-mysql/schema"
  22. _ "github.com/go-sql-driver/mysql" //导入mysql驱动包
  23. "github.com/recoilme/slowpoke"
  24. "github.com/shopspring/decimal"
  25. "github.com/siddontang/go-log/log"
  26. )
  27. var (
  28. host string //"MySQL host")
  29. port int //"MySQL port")
  30. user string // "MySQL user, must have replication privilege")
  31. password string // "MySQL password")
  32. cfg *canal.Config
  33. flavor string // "Flavor: mysql or mariadb")
  34. serverID int // "Unique Server ID")
  35. mysqldump string // "mysqldump execution path")
  36. tables string // "dump tables, seperated by comma, will overwrite dbs")
  37. tableDB string // "database for dump tables")
  38. ignoreTables string // "ignore tables, must be database.table format, separated by comma")
  39. startName string // "start sync from binlog name")
  40. startPos uint // "start sync from binlog position of")
  41. endName string // "start sync from binlog name")
  42. endPos uint32 // "start sync from binlog position of")
  43. heartbeatPeriod int // "master heartbeat period")
  44. readTimeout int // "connection read timeout")
  45. nodename string //nodename key name
  46. nodevalue string //nodevalue value
  47. showlog int
  48. ServiceName string //服务显示名称
  49. ServiceDisplayName string //服务名称
  50. ServiceDescription string //服务描述
  51. KafkaEnable bool
  52. MqttEnable bool
  53. CurrentPath string
  54. mydb *sql.DB // 全局的数据库操作句柄
  55. posdb *sql.DB // 全局的数据库操作句柄
  56. inicfg *goconfig.ConfigFile
  57. PosChan chan Posname
  58. stmt *sql.Stmt
  59. deletestmt *sql.Stmt
  60. logger *log.Logger
  61. Ch chan int
  62. insertstmt string = ""
  63. insertcount int = 0
  64. insertcountv int = 0
  65. Exectx *sql.Tx
  66. insertlog_lasttime time.Time
  67. DqueueIndex *DQueue
  68. DqueueIndexMqtt *DQueue
  69. mqttDb *sql.DB
  70. KptCattleId string
  71. old bool
  72. tableField map[string]map[string]string
  73. pastureid string
  74. i int
  75. )
  76. const (
  77. Type = "mysql"
  78. Insert = `INSERT INTO %s(%s) VALUES%s;`
  79. Update = `UPDATE %s SET %s WHERE %s;`
  80. Delete = `DELETE FROM %s WHERE %s;`
  81. DeleteAll = `TRUNCATE TABLE %s`
  82. )
  83. type Posname struct {
  84. name string
  85. pos uint32
  86. }
  87. func ReadWithSelect(sqlch chan string) (x string, res bool) {
  88. select {
  89. case x = <-sqlch:
  90. return x, true
  91. default:
  92. return "", false
  93. }
  94. }
  95. func WriteChWithSelect(sqlch chan string, sql string) bool {
  96. timeout := time.NewTimer(time.Microsecond * 500)
  97. select {
  98. case sqlch <- sql:
  99. return true
  100. case <-timeout.C:
  101. {
  102. logger.Errorf("WriteChWithSelect 超时: %s", sql)
  103. fmt.Printf("WriteChWithSelect 超时: %s", sql)
  104. return false
  105. }
  106. }
  107. }
  108. func GetCurrentPath() (string, error) {
  109. file, err := exec.LookPath(os.Args[0])
  110. if err != nil {
  111. return "", err
  112. }
  113. path, err := filepath.Abs(file)
  114. if err != nil {
  115. return "", err
  116. }
  117. //fmt.Println("path111:", path)
  118. if runtime.GOOS == "windows" {
  119. path = strings.Replace(path, "\\", "/", -1)
  120. }
  121. //fmt.Println("path222:", path)
  122. i := strings.LastIndex(path, "/")
  123. if i < 0 {
  124. return "", errors.New(`Can't find "/" or "\".`)
  125. }
  126. //fmt.Println("path333:", path)
  127. return string(path[0 : i+1]), nil
  128. }
  129. func GetColumns(e *canal.RowsEvent) []string {
  130. var Columns []string
  131. for _, value := range e.Table.Columns {
  132. Columns = append(Columns, "`"+value.Name+"`")
  133. }
  134. if strings.Trim(nodename, " ") != "" {
  135. Columns = append(Columns, strings.Trim(nodename, " "))
  136. }
  137. return Columns
  138. }
  139. func initDB() {
  140. err := GetDbsConnect(user, password, host, port, "mysql")
  141. _, err = mydb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`")
  142. mydb.Close()
  143. err = GetDbsConnect(user, password, host, port, "sqllog")
  144. if err == nil {
  145. _, _ = mydb.Exec("CREATE TABLE `tablesqllog` (`id` int(11) NOT NULL AUTO_INCREMENT,`sql` text DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;")
  146. insertEmpty()
  147. }
  148. }
  149. func GetDbsConnect(_user, _password, _host string, _port int, _dbs string) error {
  150. connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs)
  151. var err error
  152. mydb, err = sql.Open("mysql", connecting)
  153. if err == nil {
  154. mydb.SetMaxOpenConns(10)
  155. mydb.SetMaxIdleConns(10)
  156. } else {
  157. return err
  158. }
  159. return nil
  160. }
  161. func GetmyDbsConnect(_user, _password, _host string, _port int, _dbs string) (*sql.DB, error) {
  162. connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs)
  163. mdb, err := sql.Open("mysql", connecting)
  164. if err == nil {
  165. mdb.SetMaxOpenConns(10)
  166. mdb.SetMaxIdleConns(10)
  167. } else {
  168. return nil, err
  169. }
  170. return mdb, nil
  171. }
  172. func insertEmpty() {
  173. if len(tables) > 0 && len(tableDB) > 0 {
  174. subs := strings.Split(tables, ",")
  175. for i := len(subs) - 1; i >= 0; i-- {
  176. var where []string
  177. where = append(where, " 1 = 1")
  178. if strings.Trim(nodename, " ") != "" {
  179. where = append(where, nodename+" = "+"'"+nodevalue+"'")
  180. }
  181. sqls := fmt.Sprintf(Delete, subs[i], strings.Join(where, " and "))
  182. //insertlog(sqls)
  183. //savesqltofile(sqls)
  184. err := DqueueIndex.PushOneIndex([]byte(sqls))
  185. if err != nil {
  186. fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:", err)
  187. }
  188. }
  189. //for _, value := range subs {
  190. //}
  191. }
  192. }
  193. //func () {
  194. // for i:= 0 ; i<100000000;i++ {
  195. // select {
  196. // case <-c2 :
  197. // fmt.Println("c2退出")
  198. // return
  199. // default:
  200. // err =dqueuePop.PushOneIndex(
  201. // []byte("a"+strconv.Itoa(i)),
  202. // )
  203. // if err != nil{
  204. // fmt.Println("dqueuePop.PushOneIndex err",err)
  205. //
  206. // }else {
  207. // fmt.Println("dqueuePop.PushOneIndex sucess","a"+strconv.Itoa(i))
  208. // }
  209. // //time.Sleep(1*time.Millisecond)
  210. // }
  211. // //err =dqueue.Push([][]byte{
  212. // // []byte("a"+strconv.Itoa(i)),
  213. // // []byte("b"+strconv.Itoa(i)),
  214. // //})
  215. // }
  216. //}
  217. func appendinsertsql(sqlstr string) {
  218. if insertcount == 0 {
  219. insertstmt = "insert into tablesqllog(`sql`) values(\"" + sqlstr + "\")"
  220. } else {
  221. insertstmt = insertstmt + ",(\"" + sqlstr + "\")"
  222. }
  223. insertcount++
  224. }
  225. func insertlog(sqlstr string) {
  226. var err error
  227. if mydb == nil {
  228. connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", user, password, host, port, "sqllog")
  229. mydb, err = sql.Open("mysql", connecting)
  230. if err != nil {
  231. fmt.Println(" sql.Open err", err)
  232. }
  233. mydb.SetMaxOpenConns(100)
  234. mydb.SetMaxIdleConns(10)
  235. mydb.Ping()
  236. }
  237. if Exectx == nil {
  238. Exectx, err = mydb.Begin()
  239. if err != nil {
  240. fmt.Println(err)
  241. }
  242. }
  243. if Exectx != nil {
  244. _, err = Exectx.Exec("insert into tablesqllog(`sql`) values(?);", sqlstr)
  245. if err != nil {
  246. fmt.Println("Exectx.Exec(", err)
  247. } else {
  248. insertcount++
  249. if time.Since(insertlog_lasttime) > 60*time.Second {
  250. err := Exectx.Commit()
  251. Exectx = nil
  252. if err != nil {
  253. fmt.Println("Exectx.Commit():", err)
  254. }
  255. insertlog_lasttime = time.Now() // get current time
  256. } else {
  257. if insertcount > 10000 {
  258. err := Exectx.Commit()
  259. Exectx = nil
  260. if err != nil {
  261. fmt.Println("Exectx.Commit()insertcount:", err)
  262. }
  263. insertlog_lasttime = time.Now() // get current time
  264. insertcount = 0
  265. }
  266. }
  267. }
  268. }
  269. }
  270. func Exec(mdb *sql.DB, _exsqlstr string) error {
  271. if mdb != nil {
  272. _, err := mdb.Exec(_exsqlstr)
  273. if err != nil {
  274. return err
  275. }
  276. }
  277. return nil
  278. }
  279. func ExecT(mdb *sql.Tx, _exsqlstr string) error {
  280. if mdb != nil {
  281. _, err := mdb.Exec(_exsqlstr)
  282. if err != nil {
  283. return err
  284. }
  285. }
  286. return nil
  287. }
  288. func Get(mdb *sql.DB) (int, string) {
  289. var id int
  290. var sql string
  291. sql = ""
  292. if mdb != nil {
  293. //参数绑定,可以避免sql注入
  294. rows := mdb.QueryRow("select `id`,`sql` from `tablesqllog` order by id limit 1")
  295. err := rows.Scan(&id, &sql)
  296. if err != nil {
  297. return 0, ""
  298. }
  299. }
  300. return id, sql
  301. }
  302. func delete(mdb *sql.DB, id int) error {
  303. if deletestmt == nil {
  304. deletestmt, _ = mdb.Prepare("delete from `tablesqllog` where id <= ? ")
  305. }
  306. if deletestmt != nil {
  307. _, err := deletestmt.Exec(id)
  308. if err != nil {
  309. return err
  310. }
  311. }
  312. return nil
  313. }
  314. func GetDML(e *canal.RowsEvent) string {
  315. ColumnsOld := e.Table.Columns
  316. data := make(map[string]string, 0)
  317. if _, ok := tableField[e.Table.Name]; ok {
  318. data = tableField[e.Table.Name]
  319. }
  320. if !old {
  321. for _, v := range e.Table.Columns {
  322. if strings.Index(v.Name, "pmid") > -1 {
  323. return ""
  324. }
  325. }
  326. }
  327. updateMap := make(map[string]string, 0)
  328. deleteField := []string{}
  329. if e.Action == "insert" {
  330. if len(data) > 0 {
  331. Columns := make([]schema.TableColumn, 0)
  332. for _, v := range e.Table.Columns {
  333. if _, ok := data[v.Name]; ok {
  334. v.Name = data[v.Name]
  335. }
  336. Columns = append(Columns, v)
  337. }
  338. for datak, datav := range data {
  339. if datak == "deleteField" {
  340. continue
  341. }
  342. key := datak
  343. for _, v := range Columns {
  344. if key == v.Name || datav == v.Name {
  345. key = ""
  346. break
  347. }
  348. }
  349. if key != "" {
  350. Columns = append(Columns, schema.TableColumn{
  351. Name: key,
  352. })
  353. e.Rows[0] = append(e.Rows[0], datav)
  354. }
  355. }
  356. slist := []string{}
  357. for _, v := range Columns {
  358. slist = append(slist, v.Name)
  359. }
  360. if _, ok := data["deleteField"]; ok {
  361. deleteFields := strings.Split(data["deleteField"], ",")
  362. rowindexList := []int{}
  363. for _, field := range deleteFields {
  364. index := -1
  365. for k, v := range Columns {
  366. if v.Name == field {
  367. index = k
  368. rowindexList = append(rowindexList, index)
  369. break
  370. }
  371. }
  372. }
  373. // rows := e.Rows[0]
  374. var rowList [][]interface{}
  375. for _, row := range e.Rows {
  376. var rows []interface{}
  377. for i, r := range row {
  378. exist := false
  379. for _, index := range rowindexList {
  380. if index == i {
  381. exist = true
  382. break
  383. }
  384. }
  385. if !exist {
  386. rows = append(rows, r)
  387. }
  388. }
  389. rowList = append(rowList, rows)
  390. }
  391. e.Rows = rowList
  392. // for k, _ := range e.Rows {
  393. // e.Rows[k] = rows
  394. // }
  395. // for _, field := range rowindexList {
  396. var columnsList []schema.TableColumn
  397. for i, v := range Columns {
  398. exist := false
  399. for _, index := range rowindexList {
  400. if index == i {
  401. exist = true
  402. break
  403. }
  404. }
  405. if !exist {
  406. columnsList = append(columnsList, v)
  407. }
  408. }
  409. e.Table.Columns = columnsList
  410. // }
  411. // if _, ok := data["deleteField"]; ok {
  412. // rows := e.Rows[0]
  413. // deleteFields := strings.Split(data["deleteField"], ",")
  414. // index := -1
  415. // for k, v := range Columns {
  416. // if v.Name == field {
  417. // index = k
  418. // }
  419. // }
  420. // if index == 0 {
  421. // Columns = Columns[index+1:]
  422. // rows = rows[index+1:]
  423. // } else if index == len(Columns)-1 {
  424. // Columns = Columns[:index]
  425. // rows = rows[:index]
  426. // } else if index > 0 {
  427. // var arr1 []schema.TableColumn
  428. // arr1 = append(arr1, Columns[:index]...)
  429. // arr1 = append(arr1, Columns[index+1:]...)
  430. // Columns = arr1
  431. // var arr2 []interface{}
  432. // arr2 = append(arr2, rows[:index]...)
  433. // arr2 = append(arr2, rows[index+1:]...)
  434. // rows = arr2
  435. // }
  436. }
  437. // for k, _ := range e.Rows {
  438. // e.Rows[k] = rows
  439. // }
  440. }
  441. } else {
  442. for k, v := range data {
  443. if k != "deleteField" {
  444. updateMap[k] = v
  445. } else {
  446. deleteField = strings.Split(v, ",")
  447. }
  448. }
  449. }
  450. query := ""
  451. if e.Action == "insert" {
  452. Columns := GetColumns(e)
  453. var rows []string
  454. var ids []string
  455. var idindex int
  456. for i, column := range Columns {
  457. if column == "id" {
  458. idindex = i
  459. break
  460. }
  461. }
  462. for _, value := range e.Rows {
  463. exist := false
  464. var frows []string
  465. for index, fvalue := range value {
  466. switch v := fvalue.(type) {
  467. case string:
  468. frows = append(frows, "'"+v+"'")
  469. case int8, int16, int32, int, int64:
  470. strV := fmt.Sprintf("%d", v)
  471. frows = append(frows, strV)
  472. case decimal.Decimal:
  473. frows = append(frows, v.String())
  474. case float32:
  475. frows = append(frows, strconv.FormatFloat(float64(v), 'f', -1, 32))
  476. case float64:
  477. frows = append(frows, strconv.FormatFloat(v, 'f', -1, 64))
  478. default:
  479. frows = append(frows, "NULL")
  480. }
  481. if index == idindex {
  482. if len(frows) > 0 {
  483. for _, id := range ids {
  484. if id == frows[len(frows)-1] {
  485. exist = true
  486. break
  487. }
  488. }
  489. if !exist {
  490. ids = append(ids, frows[len(frows)-1])
  491. }
  492. }
  493. }
  494. }
  495. if exist {
  496. continue
  497. }
  498. if strings.Trim(nodename, " ") != "" {
  499. frows = append(frows, "'"+nodevalue+"'")
  500. }
  501. rows = append(rows, "("+strings.Join(frows, ",")+")")
  502. }
  503. query = fmt.Sprintf(Insert, e.Table.Name, strings.Join(Columns, ","), strings.Join(rows, ","))
  504. } else if e.Action == "update" {
  505. var rows []string
  506. var where []string
  507. var uset []string
  508. for i, value := range e.Rows {
  509. if (i % 2) == 0 {
  510. where = append([]string{})
  511. for _, pk := range e.Table.PKColumns {
  512. name := e.Table.Columns[pk].Name
  513. if !old {
  514. if _, ok := updateMap[e.Table.Columns[pk].Name]; ok {
  515. name = updateMap[e.Table.Columns[pk].Name]
  516. }
  517. }
  518. isExist := false
  519. for _, fname := range deleteField {
  520. if fname == name {
  521. isExist = true
  522. break
  523. }
  524. }
  525. if isExist {
  526. continue
  527. }
  528. switch v := value[pk].(type) {
  529. case string:
  530. where = append(where, "`"+name+"` = '"+v+"'")
  531. case int8, int16, int32, int, int64:
  532. strV := fmt.Sprintf("%d", v)
  533. where = append(where, "`"+name+"` = "+strV)
  534. case decimal.Decimal:
  535. where = append(where, "`"+name+"` = "+v.String())
  536. case float32:
  537. where = append(where, "`"+name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
  538. case float64:
  539. where = append(where, "`"+name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
  540. default:
  541. where = append(where, "`"+name+"` is NULL")
  542. }
  543. }
  544. if strings.Trim(nodename, " ") != "" {
  545. where = append(where, nodename+" = "+"'"+nodevalue+"'")
  546. }
  547. // isExist := false
  548. // for _, w := range where {
  549. // if strings.Index(w, "pastureid") >= 0 {
  550. // isExist = true
  551. // break
  552. // }
  553. // }
  554. // if old && !isExist {
  555. // where = append(where, " pastureid"+" = "+"'"+pastureid+"'")
  556. // } else if !isExist {
  557. // where = append(where, " pasturecode"+" = "+"'"+pastureid+"'")
  558. // }
  559. } else {
  560. uset = append([]string{})
  561. for j, _ := range e.Table.Columns {
  562. xx := reflect.DeepEqual(e.Rows[i-1][j], value[j])
  563. if !xx {
  564. name := e.Table.Columns[j].Name
  565. if !old {
  566. if _, ok := updateMap[e.Table.Columns[j].Name]; ok {
  567. name = updateMap[e.Table.Columns[j].Name]
  568. }
  569. }
  570. isExist := false
  571. for _, fname := range deleteField {
  572. if fname == name {
  573. isExist = true
  574. break
  575. }
  576. }
  577. if isExist {
  578. continue
  579. }
  580. switch v := value[j].(type) {
  581. case string:
  582. uset = append(uset, "`"+name+"` = '"+v+"'")
  583. case int8, int16, int32, int, int64:
  584. strV := fmt.Sprintf("%d", v)
  585. uset = append(uset, "`"+name+"` = "+strV)
  586. case decimal.Decimal:
  587. uset = append(uset, "`"+name+"` = "+v.String())
  588. case float32:
  589. uset = append(uset, "`"+name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
  590. case float64:
  591. uset = append(uset, "`"+name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
  592. default:
  593. uset = append(uset, "`"+name+"` = NULL")
  594. }
  595. }
  596. }
  597. if len(uset) == 0 {
  598. return ""
  599. }
  600. rows = append(rows, fmt.Sprintf(Update, e.Table.Name, strings.Join(uset, ", "), strings.Join(where, " and ")))
  601. }
  602. }
  603. query = strings.Join(rows, "\n")
  604. } else if e.Action == "delete" {
  605. var rows []string
  606. for _, value := range e.Rows {
  607. var where []string
  608. for _, pk := range e.Table.PKColumns {
  609. name := e.Table.Columns[pk].Name
  610. if !old {
  611. if _, ok := updateMap[e.Table.Columns[pk].Name]; ok {
  612. name = updateMap[e.Table.Columns[pk].Name]
  613. }
  614. }
  615. isExist := false
  616. for _, fname := range deleteField {
  617. if fname == name {
  618. isExist = true
  619. break
  620. }
  621. }
  622. if isExist {
  623. continue
  624. }
  625. switch v := value[pk].(type) {
  626. case string:
  627. where = append(where, "`"+e.Table.Columns[pk].Name+"` = '"+v+"'")
  628. case int8, int16, int32, int, int64:
  629. strV := fmt.Sprintf("%d", v)
  630. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strV)
  631. case float32:
  632. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
  633. case float64:
  634. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
  635. default:
  636. where = append(where, "`"+e.Table.Columns[pk].Name+"` is NULL")
  637. }
  638. }
  639. if strings.Trim(nodename, " ") != "" {
  640. where = append(where, nodename+" = "+"'"+nodevalue+"'")
  641. }
  642. // isExist := false
  643. // for _, w := range where {
  644. // if strings.Index(w, "pastureid") >= 0 {
  645. // isExist = true
  646. // break
  647. // }
  648. // }
  649. // if !old {
  650. // where = append(where, " pastureid"+" = "+"'"+pastureid+"'")
  651. // } else if isExist {
  652. // where = append(where, " pasturecode"+" = "+"'"+pastureid+"'")
  653. // }
  654. rows = append(rows, fmt.Sprintf(Delete, e.Table.Name, strings.Join(where, " and ")))
  655. }
  656. query = strings.Join(rows, "\n")
  657. }
  658. e.Table.Columns = ColumnsOld
  659. return query
  660. }
  661. type handler struct {
  662. canal.DummyEventHandler
  663. }
  664. func OpenFile(filename string) (*os.File, error) {
  665. if _, err := os.Stat(filename); os.IsNotExist(err) {
  666. fmt.Println("文件不存在")
  667. return os.Create(filename) //创建文件
  668. }
  669. fmt.Println("文件存在")
  670. return os.OpenFile(filename, os.O_APPEND, 0666) //打开文件
  671. }
  672. func (h *handler) OnRow(e *canal.RowsEvent) error {
  673. needsave := 0
  674. if len(tables) > 0 && len(tableDB) > 0 {
  675. subs := strings.Split(tables, ",")
  676. for _, value := range subs {
  677. if strings.ToLower(e.Table.Name) == strings.ToLower(value) {
  678. needsave = 1
  679. break
  680. }
  681. }
  682. } else {
  683. needsave = 1
  684. }
  685. if needsave > 0 {
  686. s := GetDML(e)
  687. if len(s) > 0 {
  688. if showlog > 0 {
  689. // fmt.Println(s)
  690. }
  691. if strings.Index(s, "downloadedplan") > 0 {
  692. // fmt.Println(s)
  693. }
  694. //insertlog(s)
  695. //sqlDeque.PushBack(s)
  696. //mysqlQueue.Enqueue(s)
  697. //WriteChWithSelect(mysqlch, s)
  698. //savesqltofile(s)
  699. err := DqueueIndex.PushOneIndex([]byte(s))
  700. if err != nil {
  701. fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:", err)
  702. }
  703. }
  704. }
  705. return nil
  706. }
  707. func (h *handler) OnRotate(roateEvent *replication.RotateEvent) error {
  708. //writePosini(string(roateEvent.NextLogName), uint32(roateEvent.Position))
  709. setini(string(roateEvent.NextLogName), uint32(roateEvent.Position))
  710. return nil
  711. }
  712. func (h *handler) OnPosSynced(p mysql.Position, G mysql.GTIDSet, g bool) error {
  713. // fmt.Println(p.Name + " | " + strconv.Itoa(int(p.Pos)))
  714. endName = p.Name
  715. endPos = p.Pos
  716. //writePosini(p.Name, p.Pos)
  717. setini(p.Name, p.Pos)
  718. return nil
  719. }
  720. func (h *handler) String() string {
  721. return "TestHandler"
  722. }
  723. func Readini() {
  724. CurrentPath, _ = GetCurrentPath()
  725. h, _ := log.NewRotatingFileHandler(CurrentPath+"Consumer.log", 40*1024*1024, 10)
  726. logger = log.NewDefault(h)
  727. logger.SetLevelByName("info")
  728. inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
  729. if err != nil {
  730. logger.Errorln("读取配置文件失败1[config.ini]", err.Error())
  731. return
  732. }
  733. PosChan = make(chan Posname, 10000)
  734. go runsetini()
  735. host = inicfg.MustValue("canal", "host", "127.0.0.1")
  736. port = inicfg.MustInt("canal", "port", 3306)
  737. user = inicfg.MustValue("canal", "user", "root")
  738. password = inicfg.MustValue("canal", "password", "root")
  739. flavor = inicfg.MustValue("canal", "flavor", "mariadb")
  740. serverID = inicfg.MustInt("canal", "serverID", 101)
  741. mysqldump = inicfg.MustValue("canal", "mysqldump", "mysqldump")
  742. tables = inicfg.MustValue("canal", "tables", "")
  743. tableDB = inicfg.MustValue("canal", "tableDB", "")
  744. ignoreTables = inicfg.MustValue("canal", "ignoreTables", "")
  745. startName = inicfg.MustValue("canal", "startName", "")
  746. startPos = uint(inicfg.MustInt("canal", "startPos", 0))
  747. endName = startName
  748. endPos = uint32(startPos)
  749. heartbeatPeriod = inicfg.MustInt("canal", "heartbeatPeriod", 60)
  750. readTimeout = inicfg.MustInt("canal", "readTimeout", 90)
  751. nodename = inicfg.MustValue("canal", "nodename", "")
  752. nodevalue = inicfg.MustValue("canal", "nodevalue", "1")
  753. showlog = inicfg.MustInt("canal", "showlog", 1)
  754. onetimerows = inicfg.MustValue("canal", "onetimerows", "1000")
  755. KafkaEnable = inicfg.MustBool("kafka", "kafkaEnable", false)
  756. kafka_host = inicfg.MustValue("kafka", "kafka_host", "127.0.0.1")
  757. kafka_port = inicfg.MustInt("kafka", "kafka_port", 9092)
  758. kafka_topic = inicfg.MustValue("kafka", "kafka_topic", "kafka_go_test")
  759. saslEnable = inicfg.MustBool("kafka", "saslEnable", false)
  760. username = inicfg.MustValue("kafka", "username", "root")
  761. saslpassword = inicfg.MustValue("kafka", "saslpassword", "root")
  762. tlsEnable = inicfg.MustBool("kafka", "tlsEnable", false)
  763. clientcert = inicfg.MustValue("kafka", "clientcert", "")
  764. clientkey = inicfg.MustValue("kafka", "clientkey", "")
  765. cacert = inicfg.MustValue("kafka", "cacert", "")
  766. MqttEnable = inicfg.MustBool("mqtt", "mqttEnable", false)
  767. KptCattleId = inicfg.MustValue("mqtt", "kptCattleId", "888888")
  768. mqtt_nodevalue = nodevalue
  769. mqtt_host = inicfg.MustValue("mqtt", "host", "127.0.0.1")
  770. mqtt_port = inicfg.MustInt("mqtt", "port", 9092)
  771. mqtt_path = inicfg.MustValue("mqtt", "path", "")
  772. mqtt_qos = inicfg.MustInt("mqtt", "qos", 0)
  773. mqtt_topic = inicfg.MustValue("mqtt", "topic", "mqtt_topic")
  774. mqtt_saslEnable = inicfg.MustBool("mqtt", "saslEnable", false)
  775. mqtt_username = inicfg.MustValue("mqtt", "username", "127.0.0.1")
  776. mqtt_saslpassword = inicfg.MustValue("mqtt", "saslpassword", "")
  777. mqtt_tlsEnable = inicfg.MustBool("mqtt", "tlsEnable", false)
  778. mqtt_clientcert = inicfg.MustValue("mqtt", "clientcert", "")
  779. mqtt_clientkey = inicfg.MustValue("mqtt", "clientkey", "")
  780. mqtt_cacert = inicfg.MustValue("mqtt", "cacert", "")
  781. mqtt_CleanSession = inicfg.MustBool("mqtt", "CleanSession", true)
  782. old = inicfg.MustBool("mqtt", "old", true)
  783. pastureid = inicfg.MustValue("canal", "pastureid", "0")
  784. tableField = make(map[string]map[string]string, 0)
  785. for _, table := range strings.Split(tables, ",") {
  786. dataMap, _ := inicfg.GetSection(table)
  787. if dataMap != nil {
  788. tableField[table] = dataMap
  789. }
  790. }
  791. // fmt.Println(tableField)
  792. ServiceName = inicfg.MustValue("Service", "ServiceName", "KPTDataService")
  793. ServiceDisplayName = inicfg.MustValue("Service", "ServiceDisplayName", "KPTDataService")
  794. ServiceDescription = inicfg.MustValue("Service", "ServiceDescription", "科湃腾数据同步")
  795. openini()
  796. file := CurrentPath + "/pos/pos.db"
  797. defer slowpoke.Close(CurrentPath + "pos/pos.db")
  798. keyN := []byte("startName")
  799. keyP := []byte("startPos")
  800. resN, err := slowpoke.Get(file, keyN)
  801. if err != nil || string(resN) == "" {
  802. logger.Error(" slowpoke.Get startName err", err, " posname :", resN)
  803. } else {
  804. res, err := slowpoke.Get(file, keyP)
  805. if err != nil {
  806. logger.Error(" slowpoke.Get startPos err", err)
  807. } else {
  808. str, err := strconv.Atoi(string(res))
  809. if err != nil || str == 0 {
  810. logger.Error("strconv.Atoi err", err, " pos :", str)
  811. } else {
  812. startName = string(resN)
  813. startPos = uint(str)
  814. }
  815. logger.Info("read pos name ", startPos, startName)
  816. }
  817. }
  818. logger.Info("end pos name", startPos, startName)
  819. }
  820. func initPos(_user, _password, _host, _dbs string, _port int) (int, string, error) {
  821. connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "mysql")
  822. var err error
  823. posdb, err = sql.Open("mysql", connecting)
  824. if err == nil {
  825. posdb.SetMaxOpenConns(10)
  826. posdb.SetMaxIdleConns(10)
  827. } else {
  828. return 0, "", err
  829. }
  830. _, err = posdb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`")
  831. posdb.Close()
  832. connecting = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "sqllog")
  833. posdb, err = sql.Open("mysql", connecting)
  834. if err == nil {
  835. posdb.SetMaxOpenConns(10)
  836. posdb.SetMaxIdleConns(10)
  837. } else {
  838. return 0, "", err
  839. }
  840. _, _ = posdb.Exec("CREATE TABLE IF NOT EXISTS `mypos` (`id` INT(11) NOT NULL ,`pos` INT(11) DEFAULT 0,`file` VARCHAR(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=INNODB DEFAULT CHARSET=utf8;")
  841. rows := posdb.QueryRow("select pos,file from mypos where id = 0 ")
  842. pos := 0
  843. file := ""
  844. if err := rows.Scan(&pos, &file); err != nil {
  845. _, err = posdb.Exec("insert mypos(id,pos,file) value (0,0,'')")
  846. }
  847. return pos, file, err
  848. }
  849. func openini() {
  850. CurrentPath, _ = GetCurrentPath()
  851. var err error
  852. inicfg, err = goconfig.LoadConfigFile(CurrentPath + "config.ini")
  853. if err != nil {
  854. logger.Errorln("读取配置文件失败[config.ini]", err.Error())
  855. return
  856. }
  857. }
  858. func setini(name string, pos uint32) {
  859. posname := Posname{name, pos}
  860. PosChan <- posname
  861. }
  862. func runsetini() {
  863. for {
  864. posname := <-PosChan
  865. logger.Info("binlog pos :", posname.pos, " binlog posname :", posname.name)
  866. CurrentPath, _ = GetCurrentPath()
  867. inicfg.SetValue("canal", "startName", posname.name)
  868. inicfg.SetValue("canal", "startPos", strconv.Itoa(int(posname.pos)))
  869. file := CurrentPath + "pos/pos.db"
  870. err := slowpoke.Set(file, []byte("startName"), []byte(posname.name))
  871. if err != nil {
  872. logger.Error(" slowpoke.Set startName err", err)
  873. }
  874. err = slowpoke.Set(file, []byte("startPos"), []byte(strconv.Itoa(int(posname.pos))))
  875. if err != nil {
  876. logger.Error(" slowpoke.Set startPos err", err)
  877. }
  878. err = slowpoke.Close(CurrentPath + "pos/pos.db")
  879. if err != nil {
  880. logger.Error(" slowpoke.Close err", err)
  881. }
  882. }
  883. }
  884. //func saveini() {
  885. //
  886. //
  887. //}
  888. func writeini(name string, pos uint32) {
  889. CurrentPath, _ = GetCurrentPath()
  890. inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
  891. if err != nil {
  892. logger.Infoln("读取配置文件失败[config.ini]")
  893. return
  894. }
  895. inicfg.SetValue("canal", "startName", name)
  896. inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
  897. err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini")
  898. if err != nil {
  899. logger.Infoln("保存配置文件失败[config.ini]")
  900. return
  901. }
  902. }
  903. func writeini1(name string, pos uint32) {
  904. CurrentPath, _ = GetCurrentPath()
  905. inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
  906. if err != nil {
  907. logger.Infoln("读取配置文件失败[config.ini]")
  908. return
  909. }
  910. inicfg.SetValue("canal", "startName", name)
  911. inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
  912. err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini")
  913. if err != nil {
  914. logger.Infoln("保存配置文件失败[config.ini]")
  915. return
  916. }
  917. }
  918. func writePosini(name string, pos uint32) {
  919. println("pos", pos)
  920. tx, _ := posdb.Begin()
  921. _, err := tx.Exec("update mypos set pos = ? , file = ? where id = 0", pos, name)
  922. if err != nil {
  923. tx.Rollback()
  924. } else {
  925. tx.Commit()
  926. }
  927. //CurrentPath, _ = GetCurrentPath()
  928. //inicfg, err := goconfig.LoadConfigFile(CurrentPath + "configPos.ini")
  929. //if err != nil {
  930. // logger.Errorln("读取配置文件失败[configPos.ini]")
  931. // return
  932. //}
  933. //inicfg.SetValue("canal", "startName", name)
  934. //inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
  935. //err = goconfig.SaveConfigFile(inicfg, CurrentPath+"configPos.ini")
  936. //if err != nil {
  937. // logger.Errorln("保存配置文件失败[configPos.ini]")
  938. // return
  939. //}
  940. }
  941. func IntToBytes(n int) []byte {
  942. data := int64(n)
  943. bytebuf := bytes.NewBuffer([]byte{})
  944. binary.Write(bytebuf, binary.BigEndian, data)
  945. return bytebuf.Bytes()
  946. }
  947. func BytesToInt(bys []byte) int {
  948. bytebuff := bytes.NewBuffer(bys)
  949. var data int64
  950. binary.Read(bytebuff, binary.BigEndian, &data)
  951. return int(data)
  952. }
  953. func RunService() {
  954. //Readini()
  955. var err error
  956. DqueueIndex, err = OpenIndexFile(CurrentPath+"logs", 1000000, 10000, false)
  957. if err != nil {
  958. fmt.Printf("create DqueueIndex err %v", err)
  959. os.Exit(1)
  960. }
  961. cfg = canal.NewDefaultConfig()
  962. cfg.Addr = fmt.Sprintf("%s:%d", host, port)
  963. cfg.User = user
  964. cfg.Password = password
  965. cfg.Flavor = flavor
  966. cfg.UseDecimal = true
  967. cfg.SemiSyncEnabled = true
  968. // cfg.ReadTimeout = time.Duration * 10
  969. cfg.ReadTimeout = time.Duration(readTimeout) * time.Second
  970. cfg.HeartbeatPeriod = time.Duration(heartbeatPeriod) * time.Second
  971. cfg.ServerID = uint32(serverID)
  972. cfg.Dump.ExecutionPath = CurrentPath + mysqldump
  973. cfg.Dump.DiscardErr = false
  974. cfg.Dump.MaxAllowedPacketMB = 500
  975. //cfg.Dump.TableDB = "tmrwatch"
  976. c, err := canal.NewCanal(cfg)
  977. if err != nil {
  978. fmt.Printf("create canal err %v", err)
  979. os.Exit(1)
  980. }
  981. if len(ignoreTables) > 0 {
  982. subs := strings.Split(ignoreTables, ",")
  983. for _, sub := range subs {
  984. if seps := strings.Split(sub, "."); len(seps) == 2 {
  985. c.AddDumpIgnoreTables(seps[0], seps[1])
  986. }
  987. }
  988. }
  989. if len(tables) > 0 && len(tableDB) > 0 {
  990. subs := strings.Split(tables, ",")
  991. c.AddDumpTables(tableDB, subs...)
  992. } else if len(tableDB) > 0 {
  993. subs := strings.Split(tableDB, ",")
  994. c.AddDumpDatabases(subs...)
  995. }
  996. c.SetEventHandler(&handler{})
  997. startPos_ := mysql.Position{
  998. Name: startName,
  999. Pos: uint32(startPos),
  1000. }
  1001. ctx := context.Background()
  1002. ctx, cancel := context.WithCancel(ctx)
  1003. go func() {
  1004. if !(startPos > 0) {
  1005. initDB()
  1006. //SqlBitcask.DeleteAll()
  1007. //SqlBitcask.Merge()
  1008. insertEmpty()
  1009. }
  1010. err = c.RunFrom(startPos_)
  1011. if err != nil { //出错后,将重头开始上传
  1012. fmt.Printf("start canal err %v", err)
  1013. logger.Error("start canal err %v", err)
  1014. c.Close()
  1015. c, err := canal.NewCanal(cfg)
  1016. if err != nil {
  1017. fmt.Printf("create canal err %v", err)
  1018. os.Exit(1)
  1019. }
  1020. if len(ignoreTables) > 0 {
  1021. subs := strings.Split(ignoreTables, ",")
  1022. for _, sub := range subs {
  1023. if seps := strings.Split(sub, "."); len(seps) == 2 {
  1024. c.AddDumpIgnoreTables(seps[0], seps[1])
  1025. }
  1026. }
  1027. }
  1028. if len(tables) > 0 && len(tableDB) > 0 {
  1029. subs := strings.Split(tables, ",")
  1030. c.AddDumpTables(tableDB, subs...)
  1031. } else if len(tableDB) > 0 {
  1032. subs := strings.Split(tableDB, ",")
  1033. c.AddDumpDatabases(subs...)
  1034. }
  1035. c.SetEventHandler(&handler{})
  1036. ctx := context.Background()
  1037. ctx, cancel = context.WithCancel(ctx)
  1038. startPos_ = mysql.Position{
  1039. Name: "",
  1040. Pos: 0,
  1041. }
  1042. // initDB()
  1043. insertEmpty()
  1044. err = c.RunFrom(startPos_)
  1045. logger.Error("start canal err second %v", err)
  1046. }
  1047. }()
  1048. // s := cron.New()
  1049. // s.AddFunc("00 40 07 * * *", func() {
  1050. // insertEmpty()
  1051. // fmt.Printf("start canal err %v", err)
  1052. // logger.Error("start canal err %v", err)
  1053. // c.Close()
  1054. // c, err := canal.NewCanal(cfg)
  1055. // if err != nil {
  1056. // fmt.Printf("create canal err %v", err)
  1057. // os.Exit(1)
  1058. // }
  1059. // if len(ignoreTables) > 0 {
  1060. // subs := strings.Split(ignoreTables, ",")
  1061. // for _, sub := range subs {
  1062. // if seps := strings.Split(sub, "."); len(seps) == 2 {
  1063. // c.AddDumpIgnoreTables(seps[0], seps[1])
  1064. // }
  1065. // }
  1066. // }
  1067. // if len(tables) > 0 && len(tableDB) > 0 {
  1068. // subs := strings.Split(tables, ",")
  1069. // c.AddDumpTables(tableDB, subs...)
  1070. // } else if len(tableDB) > 0 {
  1071. // subs := strings.Split(tableDB, ",")
  1072. // c.AddDumpDatabases(subs...)
  1073. // }
  1074. // c.SetEventHandler(&handler{})
  1075. // ctx := context.Background()
  1076. // ctx, cancel = context.WithCancel(ctx)
  1077. // startPos_ = mysql.Position{
  1078. // Name: "",
  1079. // Pos: 0,
  1080. // }
  1081. // // initDB()
  1082. // err = c.RunFrom(startPos_)
  1083. // logger.Error("start canal err second %v", err)
  1084. // })
  1085. // s.Start()
  1086. //监听
  1087. //ctxListen := context.Background()
  1088. //ctxListen, cancelListen := context.WithCancel(ctxListen)
  1089. //go func() {
  1090. // ticker := time.Tick(60 * time.Second)
  1091. // for {
  1092. // select {
  1093. // case <-ticker:
  1094. // if time.Since(insertlog_lasttime) > 60*time.Second && Exectx != nil {
  1095. //
  1096. // err := Exectx.Commit()
  1097. // Exectx = nil
  1098. // if err != nil {
  1099. // fmt.Println("Exectx.Commit()ticker err:", err)
  1100. // }
  1101. // insertlog_lasttime = time.Now()
  1102. // }
  1103. // case <-ctxListen.Done():
  1104. // if Exectx != nil {
  1105. //
  1106. // err := Exectx.Commit()
  1107. // Exectx = nil
  1108. // if err != nil {
  1109. // fmt.Println("Exectx.Commit()ticker err:", err)
  1110. // }
  1111. // insertlog_lasttime = time.Now()
  1112. // fmt.Println("Exectx.Commit()tickerEXIT:", err)
  1113. // return
  1114. // }
  1115. //
  1116. // }
  1117. // }
  1118. //
  1119. //}()
  1120. if KafkaEnable {
  1121. go Kafka_producerDB(ctx)
  1122. // go Kafka_producerXXWDB(ctx)
  1123. } else if MqttEnable {
  1124. go Mqtt_producerDB(ctx)
  1125. }
  1126. Ch = make(chan int, 1)
  1127. <-Ch
  1128. //cancelListen()
  1129. cancel()
  1130. DqueueIndex.CloseIndex()
  1131. //setini(endName,endPos)
  1132. //writeini(endName,endPos)
  1133. fmt.Println("程序被关闭 ")
  1134. c.Close()
  1135. }