config2.ini 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266
  1. package kpt
  2. import (
  3. "bytes"
  4. "context"
  5. "database/sql"
  6. "encoding/binary"
  7. "errors"
  8. "fmt"
  9. "os"
  10. "os/exec"
  11. "path/filepath"
  12. "reflect"
  13. "runtime"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "github.com/Unknwon/goconfig"
  18. "github.com/go-mysql-org/go-mysql/canal"
  19. "github.com/go-mysql-org/go-mysql/mysql"
  20. "github.com/go-mysql-org/go-mysql/replication"
  21. "github.com/go-mysql-org/go-mysql/schema"
  22. _ "github.com/go-sql-driver/mysql" //导入mysql驱动包
  23. "github.com/recoilme/slowpoke"
  24. "github.com/robfig/cron"
  25. "github.com/shopspring/decimal"
  26. "github.com/siddontang/go-log/log"
  27. )
  28. var (
  29. host string //"MySQL host")
  30. port int //"MySQL port")
  31. user string // "MySQL user, must have replication privilege")
  32. password string // "MySQL password")
  33. flavor string // "Flavor: mysql or mariadb")
  34. serverID int // "Unique Server ID")
  35. mysqldump string // "mysqldump execution path")
  36. tables string // "dump tables, seperated by comma, will overwrite dbs")
  37. tableDB string // "database for dump tables")
  38. ignoreTables string // "ignore tables, must be database.table format, separated by comma")
  39. startName string // "start sync from binlog name")
  40. startPos uint // "start sync from binlog position of")
  41. endName string // "start sync from binlog name")
  42. endPos uint32 // "start sync from binlog position of")
  43. heartbeatPeriod int // "master heartbeat period")
  44. readTimeout int // "connection read timeout")
  45. nodename string //nodename key name
  46. nodevalue string //nodevalue value
  47. showlog int
  48. ServiceName string //服务显示名称
  49. ServiceDisplayName string //服务名称
  50. ServiceDescription string //服务描述
  51. KafkaEnable bool
  52. MqttEnable bool
  53. CurrentPath string
  54. mydb *sql.DB // 全局的数据库操作句柄
  55. posdb *sql.DB // 全局的数据库操作句柄
  56. inicfg *goconfig.ConfigFile
  57. PosChan chan Posname
  58. stmt *sql.Stmt
  59. deletestmt *sql.Stmt
  60. logger *log.Logger
  61. Ch chan int
  62. insertstmt string = ""
  63. insertcount int = 0
  64. insertcountv int = 0
  65. Exectx *sql.Tx
  66. insertlog_lasttime time.Time
  67. DqueueIndex *DQueue
  68. DqueueIndexMqtt *DQueue
  69. mqttDb *sql.DB
  70. KptCattleId string
  71. old bool
  72. tableField map[string]map[string]string
  73. pastureid string
  74. i int
  75. )
  76. const (
  77. Type = "mysql"
  78. Insert = `INSERT INTO %s(%s) VALUES%s;`
  79. Update = `UPDATE %s SET %s WHERE %s;`
  80. Delete = `DELETE FROM %s WHERE %s;`
  81. DeleteAll = `TRUNCATE TABLE %s`
  82. )
  83. type Posname struct {
  84. name string
  85. pos uint32
  86. }
  87. func ReadWithSelect(sqlch chan string) (x string, res bool) {
  88. select {
  89. case x = <-sqlch:
  90. return x, true
  91. default:
  92. return "", false
  93. }
  94. }
  95. func WriteChWithSelect(sqlch chan string, sql string) bool {
  96. timeout := time.NewTimer(time.Microsecond * 500)
  97. select {
  98. case sqlch <- sql:
  99. return true
  100. case <-timeout.C:
  101. {
  102. logger.Errorf("WriteChWithSelect 超时: %s", sql)
  103. fmt.Printf("WriteChWithSelect 超时: %s", sql)
  104. return false
  105. }
  106. }
  107. }
  108. func GetCurrentPath() (string, error) {
  109. file, err := exec.LookPath(os.Args[0])
  110. if err != nil {
  111. return "", err
  112. }
  113. path, err := filepath.Abs(file)
  114. if err != nil {
  115. return "", err
  116. }
  117. //fmt.Println("path111:", path)
  118. if runtime.GOOS == "windows" {
  119. path = strings.Replace(path, "\\", "/", -1)
  120. }
  121. //fmt.Println("path222:", path)
  122. i := strings.LastIndex(path, "/")
  123. if i < 0 {
  124. return "", errors.New(`Can't find "/" or "\".`)
  125. }
  126. //fmt.Println("path333:", path)
  127. return string(path[0 : i+1]), nil
  128. }
  129. func GetColumns(e *canal.RowsEvent) []string {
  130. var Columns []string
  131. for _, value := range e.Table.Columns {
  132. Columns = append(Columns, "`"+value.Name+"`")
  133. }
  134. if strings.Trim(nodename, " ") != "" {
  135. Columns = append(Columns, strings.Trim(nodename, " "))
  136. }
  137. return Columns
  138. }
  139. func initDB() {
  140. err := GetDbsConnect(user, password, host, port, "mysql")
  141. _, err = mydb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`")
  142. mydb.Close()
  143. err = GetDbsConnect(user, password, host, port, "sqllog")
  144. if err == nil {
  145. _, _ = mydb.Exec("CREATE TABLE `tablesqllog` (`id` int(11) NOT NULL AUTO_INCREMENT,`sql` text DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;")
  146. insertEmpty()
  147. }
  148. }
  149. func GetDbsConnect(_user, _password, _host string, _port int, _dbs string) error {
  150. connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs)
  151. var err error
  152. mydb, err = sql.Open("mysql", connecting)
  153. if err == nil {
  154. mydb.SetMaxOpenConns(10)
  155. mydb.SetMaxIdleConns(10)
  156. } else {
  157. return err
  158. }
  159. return nil
  160. }
  161. func GetmyDbsConnect(_user, _password, _host string, _port int, _dbs string) (*sql.DB, error) {
  162. connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs)
  163. mdb, err := sql.Open("mysql", connecting)
  164. if err == nil {
  165. mdb.SetMaxOpenConns(10)
  166. mdb.SetMaxIdleConns(10)
  167. } else {
  168. return nil, err
  169. }
  170. return mdb, nil
  171. }
  172. func insertEmpty() {
  173. if len(tables) > 0 && len(tableDB) > 0 {
  174. subs := strings.Split(tables, ",")
  175. for i := len(subs) - 1; i >= 0; i-- {
  176. var where []string
  177. where = append(where, " 1 = 1")
  178. if strings.Trim(nodename, " ") != "" {
  179. where = append(where, nodename+" = "+"'"+nodevalue+"'")
  180. }
  181. sqls := fmt.Sprintf(Delete, subs[i], strings.Join(where, " and "))
  182. //insertlog(sqls)
  183. //savesqltofile(sqls)
  184. err := DqueueIndex.PushOneIndex([]byte(sqls))
  185. if err != nil {
  186. fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:", err)
  187. }
  188. }
  189. //for _, value := range subs {
  190. //}
  191. }
  192. }
  193. //func () {
  194. // for i:= 0 ; i<100000000;i++ {
  195. // select {
  196. // case <-c2 :
  197. // fmt.Println("c2退出")
  198. // return
  199. // default:
  200. // err =dqueuePop.PushOneIndex(
  201. // []byte("a"+strconv.Itoa(i)),
  202. // )
  203. // if err != nil{
  204. // fmt.Println("dqueuePop.PushOneIndex err",err)
  205. //
  206. // }else {
  207. // fmt.Println("dqueuePop.PushOneIndex sucess","a"+strconv.Itoa(i))
  208. // }
  209. // //time.Sleep(1*time.Millisecond)
  210. // }
  211. // //err =dqueue.Push([][]byte{
  212. // // []byte("a"+strconv.Itoa(i)),
  213. // // []byte("b"+strconv.Itoa(i)),
  214. // //})
  215. // }
  216. //}
  217. func appendinsertsql(sqlstr string) {
  218. if insertcount == 0 {
  219. insertstmt = "insert into tablesqllog(`sql`) values(\"" + sqlstr + "\")"
  220. } else {
  221. insertstmt = insertstmt + ",(\"" + sqlstr + "\")"
  222. }
  223. insertcount++
  224. }
  225. func insertlog(sqlstr string) {
  226. var err error
  227. if mydb == nil {
  228. connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", user, password, host, port, "sqllog")
  229. mydb, err = sql.Open("mysql", connecting)
  230. if err != nil {
  231. fmt.Println(" sql.Open err", err)
  232. }
  233. mydb.SetMaxOpenConns(100)
  234. mydb.SetMaxIdleConns(10)
  235. mydb.Ping()
  236. }
  237. if Exectx == nil {
  238. Exectx, err = mydb.Begin()
  239. if err != nil {
  240. fmt.Println(err)
  241. }
  242. }
  243. if Exectx != nil {
  244. _, err = Exectx.Exec("insert into tablesqllog(`sql`) values(?);", sqlstr)
  245. if err != nil {
  246. fmt.Println("Exectx.Exec(", err)
  247. } else {
  248. insertcount++
  249. if time.Since(insertlog_lasttime) > 60*time.Second {
  250. err := Exectx.Commit()
  251. Exectx = nil
  252. if err != nil {
  253. fmt.Println("Exectx.Commit():", err)
  254. }
  255. insertlog_lasttime = time.Now() // get current time
  256. } else {
  257. if insertcount > 10000 {
  258. err := Exectx.Commit()
  259. Exectx = nil
  260. if err != nil {
  261. fmt.Println("Exectx.Commit()insertcount:", err)
  262. }
  263. insertlog_lasttime = time.Now() // get current time
  264. insertcount = 0
  265. }
  266. }
  267. }
  268. }
  269. }
  270. func Exec(mdb *sql.DB, _exsqlstr string) error {
  271. if mdb != nil {
  272. _, err := mdb.Exec(_exsqlstr)
  273. if err != nil {
  274. return err
  275. }
  276. }
  277. return nil
  278. }
  279. func ExecT(mdb *sql.Tx, _exsqlstr string) error {
  280. if mdb != nil {
  281. _, err := mdb.Exec(_exsqlstr)
  282. if err != nil {
  283. return err
  284. }
  285. }
  286. return nil
  287. }
  288. func Get(mdb *sql.DB) (int, string) {
  289. var id int
  290. var sql string
  291. sql = ""
  292. if mdb != nil {
  293. //参数绑定,可以避免sql注入
  294. rows := mdb.QueryRow("select `id`,`sql` from `tablesqllog` order by id limit 1")
  295. err := rows.Scan(&id, &sql)
  296. if err != nil {
  297. return 0, ""
  298. }
  299. }
  300. return id, sql
  301. }
  302. func delete(mdb *sql.DB, id int) error {
  303. if deletestmt == nil {
  304. deletestmt, _ = mdb.Prepare("delete from `tablesqllog` where id <= ? ")
  305. }
  306. if deletestmt != nil {
  307. _, err := deletestmt.Exec(id)
  308. if err != nil {
  309. return err
  310. }
  311. }
  312. return nil
  313. }
  314. func GetDML(e *canal.RowsEvent) string {
  315. ColumnsOld := e.Table.Columns
  316. data := make(map[string]string, 0)
  317. if _, ok := tableField[e.Table.Name]; ok {
  318. data = tableField[e.Table.Name]
  319. }
  320. if !old {
  321. for _, v := range e.Table.Columns {
  322. if strings.Index(v.Name, "pmid") > -1 {
  323. return ""
  324. }
  325. }
  326. }
  327. updateMap := make(map[string]string, 0)
  328. deleteField := []string{}
  329. if e.Action == "insert" {
  330. fmt.Println(e.Action, e.Table.Name)
  331. if e.Table.Name == "downloadedplan" {
  332. fmt.Println(e.Table.Name)
  333. }
  334. if len(data) > 0 {
  335. Columns := make([]schema.TableColumn, 0)
  336. for _, v := range e.Table.Columns {
  337. if _, ok := data[v.Name]; ok {
  338. v.Name = data[v.Name]
  339. }
  340. Columns = append(Columns, v)
  341. }
  342. for datak, datav := range data {
  343. if datak == "deleteField" {
  344. continue
  345. }
  346. key := datak
  347. for _, v := range Columns {
  348. if key == v.Name || datav == v.Name {
  349. key = ""
  350. break
  351. }
  352. }
  353. if key != "" {
  354. Columns = append(Columns, schema.TableColumn{
  355. Name: key,
  356. })
  357. e.Rows[0] = append(e.Rows[0], datav)
  358. }
  359. }
  360. slist := []string{}
  361. for _, v := range Columns {
  362. slist = append(slist, v.Name)
  363. }
  364. if _, ok := data["deleteField"]; ok {
  365. // rows := e.Rows[0]
  366. var rowList [][]interface{}
  367. for _, row := range e.Rows {
  368. rows := row
  369. if e.Table.Name == "downloadedplan" {
  370. fmt.Println(len(rows), 1)
  371. }
  372. deleteFields := strings.Split(data["deleteField"], ",")
  373. for _, field := range deleteFields {
  374. index := -1
  375. for k, v := range Columns {
  376. if v.Name == field {
  377. index = k
  378. }
  379. }
  380. if index == 0 {
  381. Columns = Columns[index+1:]
  382. rows = rows[index+1:]
  383. } else if index == len(Columns)-1 {
  384. Columns = Columns[:index]
  385. rows = rows[:index]
  386. } else if index > 0 {
  387. var arr1 []schema.TableColumn
  388. arr1 = append(arr1, Columns[:index]...)
  389. arr1 = append(arr1, Columns[index+1:]...)
  390. Columns = arr1
  391. var arr2 []interface{}
  392. arr2 = append(arr2, rows[:index]...)
  393. arr2 = append(arr2, rows[index+1:]...)
  394. rows = arr2
  395. }
  396. if e.Table.Name == "downloadedplan" {
  397. fmt.Println(len(rows), 2)
  398. }
  399. rowList = append(rowList, rows)
  400. }
  401. }
  402. e.Rows = rowList
  403. // for k, _ := range e.Rows {
  404. // e.Rows[k] = rows
  405. // }
  406. }
  407. e.Table.Columns = Columns
  408. }
  409. } else {
  410. for k, v := range data {
  411. if k != "deleteField" {
  412. updateMap[k] = v
  413. } else {
  414. deleteField = strings.Split(v, ",")
  415. }
  416. }
  417. }
  418. query := ""
  419. if e.Action == "insert" {
  420. Columns := GetColumns(e)
  421. var rows []string
  422. var ids []string
  423. var idindex int
  424. for i, column := range Columns {
  425. if column == "id" {
  426. idindex = i
  427. break
  428. }
  429. }
  430. for _, value := range e.Rows {
  431. exist := false
  432. var frows []string
  433. for index, fvalue := range value {
  434. switch v := fvalue.(type) {
  435. case string:
  436. frows = append(frows, "'"+v+"'")
  437. case int8, int16, int32, int, int64:
  438. strV := fmt.Sprintf("%d", v)
  439. frows = append(frows, strV)
  440. case decimal.Decimal:
  441. frows = append(frows, v.String())
  442. case float32:
  443. frows = append(frows, strconv.FormatFloat(float64(v), 'f', -1, 32))
  444. case float64:
  445. frows = append(frows, strconv.FormatFloat(v, 'f', -1, 64))
  446. default:
  447. frows = append(frows, "NULL")
  448. }
  449. if index == idindex {
  450. if len(frows) > 0 {
  451. for _, id := range ids {
  452. if id == frows[len(frows)-1] {
  453. exist = true
  454. break
  455. }
  456. }
  457. if !exist {
  458. ids = append(ids, frows[len(frows)-1])
  459. }
  460. }
  461. }
  462. }
  463. if exist {
  464. continue
  465. }
  466. if strings.Trim(nodename, " ") != "" {
  467. frows = append(frows, "'"+nodevalue+"'")
  468. }
  469. rows = append(rows, "("+strings.Join(frows, ",")+")")
  470. }
  471. query = fmt.Sprintf(Insert, e.Table.Name, strings.Join(Columns, ","), strings.Join(rows, ","))
  472. } else if e.Action == "update" {
  473. var rows []string
  474. var where []string
  475. var uset []string
  476. for i, value := range e.Rows {
  477. if (i % 2) == 0 {
  478. where = append([]string{})
  479. for _, pk := range e.Table.PKColumns {
  480. name := e.Table.Columns[pk].Name
  481. if !old {
  482. if _, ok := updateMap[e.Table.Columns[pk].Name]; ok {
  483. name = updateMap[e.Table.Columns[pk].Name]
  484. }
  485. }
  486. isExist := false
  487. for _, fname := range deleteField {
  488. if fname == name {
  489. isExist = true
  490. break
  491. }
  492. }
  493. if isExist {
  494. continue
  495. }
  496. switch v := value[pk].(type) {
  497. case string:
  498. where = append(where, "`"+name+"` = '"+v+"'")
  499. case int8, int16, int32, int, int64:
  500. strV := fmt.Sprintf("%d", v)
  501. where = append(where, "`"+name+"` = "+strV)
  502. case decimal.Decimal:
  503. where = append(where, "`"+name+"` = "+v.String())
  504. case float32:
  505. where = append(where, "`"+name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
  506. case float64:
  507. where = append(where, "`"+name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
  508. default:
  509. where = append(where, "`"+name+"` is NULL")
  510. }
  511. }
  512. if strings.Trim(nodename, " ") != "" {
  513. where = append(where, nodename+" = "+"'"+nodevalue+"'")
  514. }
  515. // isExist := false
  516. // for _, w := range where {
  517. // if strings.Index(w, "pastureid") >= 0 {
  518. // isExist = true
  519. // break
  520. // }
  521. // }
  522. // if old && !isExist {
  523. // where = append(where, " pastureid"+" = "+"'"+pastureid+"'")
  524. // } else if !isExist {
  525. // where = append(where, " pasturecode"+" = "+"'"+pastureid+"'")
  526. // }
  527. } else {
  528. uset = append([]string{})
  529. for j, _ := range e.Table.Columns {
  530. xx := reflect.DeepEqual(e.Rows[i-1][j], value[j])
  531. if !xx {
  532. name := e.Table.Columns[j].Name
  533. if !old {
  534. if _, ok := updateMap[e.Table.Columns[j].Name]; ok {
  535. name = updateMap[e.Table.Columns[j].Name]
  536. }
  537. }
  538. isExist := false
  539. for _, fname := range deleteField {
  540. if fname == name {
  541. isExist = true
  542. break
  543. }
  544. }
  545. if isExist {
  546. continue
  547. }
  548. switch v := value[j].(type) {
  549. case string:
  550. uset = append(uset, "`"+name+"` = '"+v+"'")
  551. case int8, int16, int32, int, int64:
  552. strV := fmt.Sprintf("%d", v)
  553. uset = append(uset, "`"+name+"` = "+strV)
  554. case decimal.Decimal:
  555. uset = append(uset, "`"+name+"` = "+v.String())
  556. case float32:
  557. uset = append(uset, "`"+name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
  558. case float64:
  559. uset = append(uset, "`"+name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
  560. default:
  561. uset = append(uset, "`"+name+"` = NULL")
  562. }
  563. }
  564. }
  565. if len(uset) == 0 {
  566. return ""
  567. }
  568. rows = append(rows, fmt.Sprintf(Update, e.Table.Name, strings.Join(uset, ", "), strings.Join(where, " and ")))
  569. }
  570. }
  571. query = strings.Join(rows, "\n")
  572. } else if e.Action == "delete" {
  573. var rows []string
  574. for _, value := range e.Rows {
  575. var where []string
  576. for _, pk := range e.Table.PKColumns {
  577. name := e.Table.Columns[pk].Name
  578. if !old {
  579. if _, ok := updateMap[e.Table.Columns[pk].Name]; ok {
  580. name = updateMap[e.Table.Columns[pk].Name]
  581. }
  582. }
  583. isExist := false
  584. for _, fname := range deleteField {
  585. if fname == name {
  586. isExist = true
  587. break
  588. }
  589. }
  590. if isExist {
  591. continue
  592. }
  593. switch v := value[pk].(type) {
  594. case string:
  595. where = append(where, "`"+e.Table.Columns[pk].Name+"` = '"+v+"'")
  596. case int8, int16, int32, int, int64:
  597. strV := fmt.Sprintf("%d", v)
  598. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strV)
  599. case float32:
  600. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
  601. case float64:
  602. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
  603. default:
  604. where = append(where, "`"+e.Table.Columns[pk].Name+"` is NULL")
  605. }
  606. }
  607. if strings.Trim(nodename, " ") != "" {
  608. where = append(where, nodename+" = "+"'"+nodevalue+"'")
  609. }
  610. // isExist := false
  611. // for _, w := range where {
  612. // if strings.Index(w, "pastureid") >= 0 {
  613. // isExist = true
  614. // break
  615. // }
  616. // }
  617. // if !old {
  618. // where = append(where, " pastureid"+" = "+"'"+pastureid+"'")
  619. // } else if isExist {
  620. // where = append(where, " pasturecode"+" = "+"'"+pastureid+"'")
  621. // }
  622. rows = append(rows, fmt.Sprintf(Delete, e.Table.Name, strings.Join(where, " and ")))
  623. }
  624. query = strings.Join(rows, "\n")
  625. }
  626. e.Table.Columns = ColumnsOld
  627. return query
  628. }
  629. type handler struct {
  630. canal.DummyEventHandler
  631. }
  632. func OpenFile(filename string) (*os.File, error) {
  633. if _, err := os.Stat(filename); os.IsNotExist(err) {
  634. fmt.Println("文件不存在")
  635. return os.Create(filename) //创建文件
  636. }
  637. fmt.Println("文件存在")
  638. return os.OpenFile(filename, os.O_APPEND, 0666) //打开文件
  639. }
  640. func (h *handler) OnRow(e *canal.RowsEvent) error {
  641. needsave := 0
  642. if len(tables) > 0 && len(tableDB) > 0 {
  643. subs := strings.Split(tables, ",")
  644. for _, value := range subs {
  645. if strings.ToLower(e.Table.Name) == strings.ToLower(value) {
  646. needsave = 1
  647. break
  648. }
  649. }
  650. } else {
  651. needsave = 1
  652. }
  653. if needsave > 0 {
  654. s := GetDML(e)
  655. if len(s) > 0 {
  656. if showlog > 0 {
  657. fmt.Println(s)
  658. }
  659. if strings.Index(s, "downloadedplan") > 0 {
  660. fmt.Println(s)
  661. }
  662. //insertlog(s)
  663. //sqlDeque.PushBack(s)
  664. //mysqlQueue.Enqueue(s)
  665. //WriteChWithSelect(mysqlch, s)
  666. //savesqltofile(s)
  667. err := DqueueIndex.PushOneIndex([]byte(s))
  668. if err != nil {
  669. fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:", err)
  670. }
  671. }
  672. }
  673. return nil
  674. }
  675. func (h *handler) OnRotate(roateEvent *replication.RotateEvent) error {
  676. //writePosini(string(roateEvent.NextLogName), uint32(roateEvent.Position))
  677. setini(string(roateEvent.NextLogName), uint32(roateEvent.Position))
  678. return nil
  679. }
  680. func (h *handler) OnPosSynced(p mysql.Position, G mysql.GTIDSet, g bool) error {
  681. // fmt.Println(p.Name + " | " + strconv.Itoa(int(p.Pos)))
  682. endName = p.Name
  683. endPos = p.Pos
  684. //writePosini(p.Name, p.Pos)
  685. setini(p.Name, p.Pos)
  686. return nil
  687. }
  688. func (h *handler) String() string {
  689. return "TestHandler"
  690. }
  691. func Readini() {
  692. CurrentPath, _ = GetCurrentPath()
  693. h, _ := log.NewRotatingFileHandler(CurrentPath+"Consumer.log", 40*1024*1024, 10)
  694. logger = log.NewDefault(h)
  695. logger.SetLevelByName("info")
  696. inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
  697. if err != nil {
  698. logger.Errorln("读取配置文件失败1[config.ini]", err.Error())
  699. return
  700. }
  701. PosChan = make(chan Posname, 10000)
  702. go runsetini()
  703. host = inicfg.MustValue("canal", "host", "127.0.0.1")
  704. port = inicfg.MustInt("canal", "port", 3306)
  705. user = inicfg.MustValue("canal", "user", "root")
  706. password = inicfg.MustValue("canal", "password", "root")
  707. flavor = inicfg.MustValue("canal", "flavor", "mariadb")
  708. serverID = inicfg.MustInt("canal", "serverID", 101)
  709. mysqldump = inicfg.MustValue("canal", "mysqldump", "mysqldump")
  710. tables = inicfg.MustValue("canal", "tables", "")
  711. tableDB = inicfg.MustValue("canal", "tableDB", "")
  712. ignoreTables = inicfg.MustValue("canal", "ignoreTables", "")
  713. startName = inicfg.MustValue("canal", "startName", "")
  714. startPos = uint(inicfg.MustInt("canal", "startPos", 0))
  715. endName = startName
  716. endPos = uint32(startPos)
  717. heartbeatPeriod = inicfg.MustInt("canal", "heartbeatPeriod", 60)
  718. readTimeout = inicfg.MustInt("canal", "readTimeout", 90)
  719. nodename = inicfg.MustValue("canal", "nodename", "")
  720. nodevalue = inicfg.MustValue("canal", "nodevalue", "1")
  721. showlog = inicfg.MustInt("canal", "showlog", 1)
  722. onetimerows = inicfg.MustValue("canal", "onetimerows", "1000")
  723. KafkaEnable = inicfg.MustBool("kafka", "kafkaEnable", false)
  724. kafka_host = inicfg.MustValue("kafka", "kafka_host", "127.0.0.1")
  725. kafka_port = inicfg.MustInt("kafka", "kafka_port", 9092)
  726. kafka_topic = inicfg.MustValue("kafka", "kafka_topic", "kafka_go_test")
  727. saslEnable = inicfg.MustBool("kafka", "saslEnable", false)
  728. username = inicfg.MustValue("kafka", "username", "root")
  729. saslpassword = inicfg.MustValue("kafka", "saslpassword", "root")
  730. tlsEnable = inicfg.MustBool("kafka", "tlsEnable", false)
  731. clientcert = inicfg.MustValue("kafka", "clientcert", "")
  732. clientkey = inicfg.MustValue("kafka", "clientkey", "")
  733. cacert = inicfg.MustValue("kafka", "cacert", "")
  734. MqttEnable = inicfg.MustBool("mqtt", "mqttEnable", false)
  735. KptCattleId = inicfg.MustValue("mqtt", "kptCattleId", "888888")
  736. mqtt_nodevalue = nodevalue
  737. mqtt_host = inicfg.MustValue("mqtt", "host", "127.0.0.1")
  738. mqtt_port = inicfg.MustInt("mqtt", "port", 9092)
  739. mqtt_path = inicfg.MustValue("mqtt", "path", "")
  740. mqtt_qos = inicfg.MustInt("mqtt", "qos", 0)
  741. mqtt_topic = inicfg.MustValue("mqtt", "topic", "mqtt_topic")
  742. mqtt_saslEnable = inicfg.MustBool("mqtt", "saslEnable", false)
  743. mqtt_username = inicfg.MustValue("mqtt", "username", "127.0.0.1")
  744. mqtt_saslpassword = inicfg.MustValue("mqtt", "saslpassword", "")
  745. mqtt_tlsEnable = inicfg.MustBool("mqtt", "tlsEnable", false)
  746. mqtt_clientcert = inicfg.MustValue("mqtt", "clientcert", "")
  747. mqtt_clientkey = inicfg.MustValue("mqtt", "clientkey", "")
  748. mqtt_cacert = inicfg.MustValue("mqtt", "cacert", "")
  749. mqtt_CleanSession = inicfg.MustBool("mqtt", "CleanSession", true)
  750. old = inicfg.MustBool("mqtt", "old", true)
  751. pastureid = inicfg.MustValue("canal", "pastureid", "0")
  752. tableField = make(map[string]map[string]string, 0)
  753. for _, table := range strings.Split(tables, ",") {
  754. dataMap, _ := inicfg.GetSection(table)
  755. if dataMap != nil {
  756. tableField[table] = dataMap
  757. }
  758. }
  759. // fmt.Println(tableField)
  760. ServiceName = inicfg.MustValue("Service", "ServiceName", "KPTDataService")
  761. ServiceDisplayName = inicfg.MustValue("Service", "ServiceDisplayName", "KPTDataService")
  762. ServiceDescription = inicfg.MustValue("Service", "ServiceDescription", "科湃腾数据同步")
  763. openini()
  764. file := CurrentPath + "/pos/pos.db"
  765. defer slowpoke.Close(CurrentPath + "pos/pos.db")
  766. keyN := []byte("startName")
  767. keyP := []byte("startPos")
  768. resN, err := slowpoke.Get(file, keyN)
  769. if err != nil || string(resN) == "" {
  770. logger.Error(" slowpoke.Get startName err", err, " posname :", resN)
  771. } else {
  772. res, err := slowpoke.Get(file, keyP)
  773. if err != nil {
  774. logger.Error(" slowpoke.Get startPos err", err)
  775. } else {
  776. str, err := strconv.Atoi(string(res))
  777. if err != nil || str == 0 {
  778. logger.Error("strconv.Atoi err", err, " pos :", str)
  779. } else {
  780. startName = string(resN)
  781. startPos = uint(str)
  782. }
  783. logger.Info("read pos name ", startPos, startName)
  784. }
  785. }
  786. logger.Info("end pos name", startPos, startName)
  787. }
  788. func initPos(_user, _password, _host, _dbs string, _port int) (int, string, error) {
  789. connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "mysql")
  790. var err error
  791. posdb, err = sql.Open("mysql", connecting)
  792. if err == nil {
  793. posdb.SetMaxOpenConns(10)
  794. posdb.SetMaxIdleConns(10)
  795. } else {
  796. return 0, "", err
  797. }
  798. _, err = posdb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`")
  799. posdb.Close()
  800. connecting = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "sqllog")
  801. posdb, err = sql.Open("mysql", connecting)
  802. if err == nil {
  803. posdb.SetMaxOpenConns(10)
  804. posdb.SetMaxIdleConns(10)
  805. } else {
  806. return 0, "", err
  807. }
  808. _, _ = posdb.Exec("CREATE TABLE IF NOT EXISTS `mypos` (`id` INT(11) NOT NULL ,`pos` INT(11) DEFAULT 0,`file` VARCHAR(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=INNODB DEFAULT CHARSET=utf8;")
  809. rows := posdb.QueryRow("select pos,file from mypos where id = 0 ")
  810. pos := 0
  811. file := ""
  812. if err := rows.Scan(&pos, &file); err != nil {
  813. _, err = posdb.Exec("insert mypos(id,pos,file) value (0,0,'')")
  814. }
  815. return pos, file, err
  816. }
  817. func openini() {
  818. CurrentPath, _ = GetCurrentPath()
  819. var err error
  820. inicfg, err = goconfig.LoadConfigFile(CurrentPath + "config.ini")
  821. if err != nil {
  822. logger.Errorln("读取配置文件失败[config.ini]", err.Error())
  823. return
  824. }
  825. }
  826. func setini(name string, pos uint32) {
  827. posname := Posname{name, pos}
  828. PosChan <- posname
  829. }
  830. func runsetini() {
  831. for {
  832. posname := <-PosChan
  833. logger.Info("binlog pos :", posname.pos, " binlog posname :", posname.name)
  834. CurrentPath, _ = GetCurrentPath()
  835. inicfg.SetValue("canal", "startName", posname.name)
  836. inicfg.SetValue("canal", "startPos", strconv.Itoa(int(posname.pos)))
  837. file := CurrentPath + "pos/pos.db"
  838. err := slowpoke.Set(file, []byte("startName"), []byte(posname.name))
  839. if err != nil {
  840. logger.Error(" slowpoke.Set startName err", err)
  841. }
  842. err = slowpoke.Set(file, []byte("startPos"), []byte(strconv.Itoa(int(posname.pos))))
  843. if err != nil {
  844. logger.Error(" slowpoke.Set startPos err", err)
  845. }
  846. err = slowpoke.Close(CurrentPath + "pos/pos.db")
  847. if err != nil {
  848. logger.Error(" slowpoke.Close err", err)
  849. }
  850. }
  851. }
  852. //func saveini() {
  853. //
  854. //
  855. //}
  856. func writeini(name string, pos uint32) {
  857. CurrentPath, _ = GetCurrentPath()
  858. inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
  859. if err != nil {
  860. logger.Infoln("读取配置文件失败[config.ini]")
  861. return
  862. }
  863. inicfg.SetValue("canal", "startName", name)
  864. inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
  865. err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini")
  866. if err != nil {
  867. logger.Infoln("保存配置文件失败[config.ini]")
  868. return
  869. }
  870. }
  871. func writeini1(name string, pos uint32) {
  872. CurrentPath, _ = GetCurrentPath()
  873. inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
  874. if err != nil {
  875. logger.Infoln("读取配置文件失败[config.ini]")
  876. return
  877. }
  878. inicfg.SetValue("canal", "startName", name)
  879. inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
  880. err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini")
  881. if err != nil {
  882. logger.Infoln("保存配置文件失败[config.ini]")
  883. return
  884. }
  885. }
  886. func writePosini(name string, pos uint32) {
  887. println("pos", pos)
  888. tx, _ := posdb.Begin()
  889. _, err := tx.Exec("update mypos set pos = ? , file = ? where id = 0", pos, name)
  890. if err != nil {
  891. tx.Rollback()
  892. } else {
  893. tx.Commit()
  894. }
  895. //CurrentPath, _ = GetCurrentPath()
  896. //inicfg, err := goconfig.LoadConfigFile(CurrentPath + "configPos.ini")
  897. //if err != nil {
  898. // logger.Errorln("读取配置文件失败[configPos.ini]")
  899. // return
  900. //}
  901. //inicfg.SetValue("canal", "startName", name)
  902. //inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
  903. //err = goconfig.SaveConfigFile(inicfg, CurrentPath+"configPos.ini")
  904. //if err != nil {
  905. // logger.Errorln("保存配置文件失败[configPos.ini]")
  906. // return
  907. //}
  908. }
  909. func IntToBytes(n int) []byte {
  910. data := int64(n)
  911. bytebuf := bytes.NewBuffer([]byte{})
  912. binary.Write(bytebuf, binary.BigEndian, data)
  913. return bytebuf.Bytes()
  914. }
  915. func BytesToInt(bys []byte) int {
  916. bytebuff := bytes.NewBuffer(bys)
  917. var data int64
  918. binary.Read(bytebuff, binary.BigEndian, &data)
  919. return int(data)
  920. }
  921. func RunService() {
  922. //Readini()
  923. var err error
  924. DqueueIndex, err = OpenIndexFile(CurrentPath+"logs", 100000, 10000, false)
  925. if err != nil {
  926. fmt.Printf("create DqueueIndex err %v", err)
  927. os.Exit(1)
  928. }
  929. cfg := canal.NewDefaultConfig()
  930. cfg.Addr = fmt.Sprintf("%s:%d", host, port)
  931. cfg.User = user
  932. cfg.Password = password
  933. cfg.Flavor = flavor
  934. cfg.UseDecimal = true
  935. cfg.SemiSyncEnabled = true
  936. // cfg.ReadTimeout = time.Duration * 10
  937. cfg.ReadTimeout = time.Duration(readTimeout) * time.Second
  938. cfg.HeartbeatPeriod = time.Duration(heartbeatPeriod) * time.Second
  939. cfg.ServerID = uint32(serverID)
  940. cfg.Dump.ExecutionPath = CurrentPath + mysqldump
  941. cfg.Dump.DiscardErr = false
  942. cfg.Dump.MaxAllowedPacketMB = 500
  943. //cfg.Dump.TableDB = "tmrwatch"
  944. c, err := canal.NewCanal(cfg)
  945. if err != nil {
  946. fmt.Printf("create canal err %v", err)
  947. os.Exit(1)
  948. }
  949. if len(ignoreTables) > 0 {
  950. subs := strings.Split(ignoreTables, ",")
  951. for _, sub := range subs {
  952. if seps := strings.Split(sub, "."); len(seps) == 2 {
  953. c.AddDumpIgnoreTables(seps[0], seps[1])
  954. }
  955. }
  956. }
  957. if len(tables) > 0 && len(tableDB) > 0 {
  958. subs := strings.Split(tables, ",")
  959. c.AddDumpTables(tableDB, subs...)
  960. } else if len(tableDB) > 0 {
  961. subs := strings.Split(tableDB, ",")
  962. c.AddDumpDatabases(subs...)
  963. }
  964. c.SetEventHandler(&handler{})
  965. startPos_ := mysql.Position{
  966. Name: startName,
  967. Pos: uint32(startPos),
  968. }
  969. ctx := context.Background()
  970. ctx, cancel := context.WithCancel(ctx)
  971. go func() {
  972. if !(startPos > 0) {
  973. //initDB()
  974. //SqlBitcask.DeleteAll()
  975. //SqlBitcask.Merge()
  976. insertEmpty()
  977. }
  978. err = c.RunFrom(startPos_)
  979. if err != nil { //出错后,将重头开始上传
  980. fmt.Printf("start canal err %v", err)
  981. logger.Error("start canal err %v", err)
  982. c.Close()
  983. c, err := canal.NewCanal(cfg)
  984. if err != nil {
  985. fmt.Printf("create canal err %v", err)
  986. os.Exit(1)
  987. }
  988. if len(ignoreTables) > 0 {
  989. subs := strings.Split(ignoreTables, ",")
  990. for _, sub := range subs {
  991. if seps := strings.Split(sub, "."); len(seps) == 2 {
  992. c.AddDumpIgnoreTables(seps[0], seps[1])
  993. }
  994. }
  995. }
  996. if len(tables) > 0 && len(tableDB) > 0 {
  997. subs := strings.Split(tables, ",")
  998. c.AddDumpTables(tableDB, subs...)
  999. } else if len(tableDB) > 0 {
  1000. subs := strings.Split(tableDB, ",")
  1001. c.AddDumpDatabases(subs...)
  1002. }
  1003. c.SetEventHandler(&handler{})
  1004. ctx := context.Background()
  1005. ctx, cancel = context.WithCancel(ctx)
  1006. startPos_ = mysql.Position{
  1007. Name: "",
  1008. Pos: 0,
  1009. }
  1010. // initDB()
  1011. insertEmpty()
  1012. err = c.RunFrom(startPos_)
  1013. logger.Error("start canal err second %v", err)
  1014. }
  1015. }()
  1016. // s := cron.New()
  1017. // s.AddFunc("00 40 07 * * *", func() {
  1018. // insertEmpty()
  1019. // fmt.Printf("start canal err %v", err)
  1020. // logger.Error("start canal err %v", err)
  1021. // c.Close()
  1022. // c, err := canal.NewCanal(cfg)
  1023. // if err != nil {
  1024. // fmt.Printf("create canal err %v", err)
  1025. // os.Exit(1)
  1026. // }
  1027. // if len(ignoreTables) > 0 {
  1028. // subs := strings.Split(ignoreTables, ",")
  1029. // for _, sub := range subs {
  1030. // if seps := strings.Split(sub, "."); len(seps) == 2 {
  1031. // c.AddDumpIgnoreTables(seps[0], seps[1])
  1032. // }
  1033. // }
  1034. // }
  1035. // if len(tables) > 0 && len(tableDB) > 0 {
  1036. // subs := strings.Split(tables, ",")
  1037. // c.AddDumpTables(tableDB, subs...)
  1038. // } else if len(tableDB) > 0 {
  1039. // subs := strings.Split(tableDB, ",")
  1040. // c.AddDumpDatabases(subs...)
  1041. // }
  1042. // c.SetEventHandler(&handler{})
  1043. // ctx := context.Background()
  1044. // ctx, cancel = context.WithCancel(ctx)
  1045. // startPos_ = mysql.Position{
  1046. // Name: "",
  1047. // Pos: 0,
  1048. // }
  1049. // // initDB()
  1050. // err = c.RunFrom(startPos_)
  1051. // logger.Error("start canal err second %v", err)
  1052. // })
  1053. // s.Start()
  1054. //监听
  1055. //ctxListen := context.Background()
  1056. //ctxListen, cancelListen := context.WithCancel(ctxListen)
  1057. //go func() {
  1058. // ticker := time.Tick(60 * time.Second)
  1059. // for {
  1060. // select {
  1061. // case <-ticker:
  1062. // if time.Since(insertlog_lasttime) > 60*time.Second && Exectx != nil {
  1063. //
  1064. // err := Exectx.Commit()
  1065. // Exectx = nil
  1066. // if err != nil {
  1067. // fmt.Println("Exectx.Commit()ticker err:", err)
  1068. // }
  1069. // insertlog_lasttime = time.Now()
  1070. // }
  1071. // case <-ctxListen.Done():
  1072. // if Exectx != nil {
  1073. //
  1074. // err := Exectx.Commit()
  1075. // Exectx = nil
  1076. // if err != nil {
  1077. // fmt.Println("Exectx.Commit()ticker err:", err)
  1078. // }
  1079. // insertlog_lasttime = time.Now()
  1080. // fmt.Println("Exectx.Commit()tickerEXIT:", err)
  1081. // return
  1082. // }
  1083. //
  1084. // }
  1085. // }
  1086. //
  1087. //}()
  1088. if KafkaEnable {
  1089. go Kafka_producerDB(ctx)
  1090. } else if MqttEnable {
  1091. go Mqtt_producerDB(ctx)
  1092. }
  1093. Ch = make(chan int, 1)
  1094. <-Ch
  1095. //cancelListen()
  1096. cancel()
  1097. DqueueIndex.CloseIndex()
  1098. //setini(endName,endPos)
  1099. //writeini(endName,endPos)
  1100. fmt.Println("程序被关闭 ")
  1101. c.Close()
  1102. }
  1103. func NewCronWithBeijingLocation() *cron.Cron {
  1104. loc, err := time.LoadLocation("Asia/Shanghai")
  1105. if err != nil {
  1106. panic(err)
  1107. }
  1108. return cron.NewWithLocation(loc)
  1109. }
  1110. if _, ok := data["deleteField"]; ok {
  1111. rows := e.Rows[0]
  1112. deleteFields := strings.Split(data["deleteField"], ",")
  1113. for _, field := range deleteFields {
  1114. index := -1
  1115. for k, v := range Columns {
  1116. if v.Name == field {
  1117. index = k
  1118. }
  1119. }
  1120. if index == 0 {
  1121. Columns = Columns[index+1:]
  1122. rows = rows[index+1:]
  1123. } else if index == len(Columns)-1 {
  1124. Columns = Columns[:index]
  1125. rows = rows[:index]
  1126. } else if index > 0 {
  1127. var arr1 []schema.TableColumn
  1128. arr1 = append(arr1, Columns[:index]...)
  1129. arr1 = append(arr1, Columns[index+1:]...)
  1130. Columns = arr1
  1131. var arr2 []interface{}
  1132. arr2 = append(arr2, rows[:index]...)
  1133. arr2 = append(arr2, rows[index+1:]...)
  1134. rows = arr2
  1135. }
  1136. }
  1137. for k, _ := range e.Rows {
  1138. e.Rows[k] = rows
  1139. }
  1140. }