gocanal.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920
  1. package kpt
  2. import (
  3. "bytes"
  4. "context"
  5. "database/sql"
  6. "encoding/binary"
  7. "errors"
  8. "fmt"
  9. "github.com/Unknwon/goconfig"
  10. "github.com/go-mysql-org/go-mysql/canal"
  11. "github.com/go-mysql-org/go-mysql/mysql"
  12. "github.com/go-mysql-org/go-mysql/replication"
  13. _ "github.com/go-sql-driver/mysql" //导入mysql驱动包
  14. "github.com/recoilme/slowpoke"
  15. "github.com/shopspring/decimal"
  16. "github.com/siddontang/go-log/log"
  17. "os"
  18. "os/exec"
  19. "path/filepath"
  20. "reflect"
  21. "runtime"
  22. "strconv"
  23. "strings"
  24. "time"
  25. )
  26. var (
  27. host string //"MySQL host")
  28. port int //"MySQL port")
  29. user string // "MySQL user, must have replication privilege")
  30. password string // "MySQL password")
  31. flavor string // "Flavor: mysql or mariadb")
  32. serverID int // "Unique Server ID")
  33. mysqldump string // "mysqldump execution path")
  34. tables string // "dump tables, seperated by comma, will overwrite dbs")
  35. tableDB string // "database for dump tables")
  36. ignoreTables string // "ignore tables, must be database.table format, separated by comma")
  37. startName string // "start sync from binlog name")
  38. startPos uint // "start sync from binlog position of")
  39. endName string // "start sync from binlog name")
  40. endPos uint32 // "start sync from binlog position of")
  41. heartbeatPeriod int // "master heartbeat period")
  42. readTimeout int // "connection read timeout")
  43. nodename string //nodename key name
  44. nodevalue string //nodevalue value
  45. showlog int
  46. ServiceName string //服务显示名称
  47. ServiceDisplayName string //服务名称
  48. ServiceDescription string //服务描述
  49. KafkaEnable bool
  50. MqttEnable bool
  51. CurrentPath string
  52. mydb *sql.DB // 全局的数据库操作句柄
  53. posdb *sql.DB // 全局的数据库操作句柄
  54. inicfg *goconfig.ConfigFile
  55. PosChan chan Posname
  56. stmt *sql.Stmt
  57. deletestmt *sql.Stmt
  58. logger *log.Logger
  59. Ch chan int
  60. insertstmt string = ""
  61. insertcount int = 0
  62. insertcountv int = 0
  63. Exectx *sql.Tx
  64. insertlog_lasttime time.Time
  65. DqueueIndex *DQueue
  66. DqueueIndexMqtt *DQueue
  67. mqttDb *sql.DB
  68. KptCattleId string
  69. )
  70. const (
  71. Type = "mysql"
  72. Insert = `INSERT INTO %s(%s) VALUES%s;`
  73. Update = `UPDATE %s SET %s WHERE %s;`
  74. Delete = `DELETE FROM %s WHERE %s;`
  75. DeleteAll = `TRUNCATE TABLE %s`
  76. )
  77. type Posname struct {
  78. name string
  79. pos uint32
  80. }
  81. func ReadWithSelect(sqlch chan string) (x string, res bool) {
  82. select {
  83. case x = <-sqlch:
  84. return x, true
  85. default:
  86. return "", false
  87. }
  88. }
  89. func WriteChWithSelect(sqlch chan string, sql string) bool {
  90. timeout := time.NewTimer(time.Microsecond * 500)
  91. select {
  92. case sqlch <- sql:
  93. return true
  94. case <-timeout.C:
  95. {
  96. logger.Errorf("WriteChWithSelect 超时: %s", sql)
  97. fmt.Printf("WriteChWithSelect 超时: %s", sql)
  98. return false
  99. }
  100. }
  101. }
  102. func GetCurrentPath() (string, error) {
  103. file, err := exec.LookPath(os.Args[0])
  104. if err != nil {
  105. return "", err
  106. }
  107. path, err := filepath.Abs(file)
  108. if err != nil {
  109. return "", err
  110. }
  111. //fmt.Println("path111:", path)
  112. if runtime.GOOS == "windows" {
  113. path = strings.Replace(path, "\\", "/", -1)
  114. }
  115. //fmt.Println("path222:", path)
  116. i := strings.LastIndex(path, "/")
  117. if i < 0 {
  118. return "", errors.New(`Can't find "/" or "\".`)
  119. }
  120. //fmt.Println("path333:", path)
  121. return string(path[0 : i+1]), nil
  122. }
  123. func GetColumns(e *canal.RowsEvent) []string {
  124. var Columns []string
  125. for _, value := range e.Table.Columns {
  126. Columns = append(Columns, "`"+value.Name+"`")
  127. }
  128. if strings.Trim(nodename, " ") != "" {
  129. Columns = append(Columns, strings.Trim(nodename, " "))
  130. }
  131. return Columns
  132. }
  133. func initDB() {
  134. err := GetDbsConnect(user, password, host, port, "mysql")
  135. _, err = mydb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`")
  136. mydb.Close()
  137. err = GetDbsConnect(user, password, host, port, "sqllog")
  138. if err == nil {
  139. _, _ = mydb.Exec("CREATE TABLE `tablesqllog` (`id` int(11) NOT NULL AUTO_INCREMENT,`sql` text DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;")
  140. insertEmpty()
  141. }
  142. }
  143. func GetDbsConnect(_user, _password, _host string, _port int, _dbs string) error {
  144. connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs)
  145. var err error
  146. mydb, err = sql.Open("mysql", connecting)
  147. if err == nil {
  148. mydb.SetMaxOpenConns(10)
  149. mydb.SetMaxIdleConns(10)
  150. } else {
  151. return err
  152. }
  153. return nil
  154. }
  155. func GetmyDbsConnect(_user, _password, _host string, _port int, _dbs string) (*sql.DB, error) {
  156. connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, _dbs)
  157. mdb, err := sql.Open("mysql", connecting)
  158. if err == nil {
  159. mdb.SetMaxOpenConns(10)
  160. mdb.SetMaxIdleConns(10)
  161. } else {
  162. return nil, err
  163. }
  164. return mdb, nil
  165. }
  166. func insertEmpty() {
  167. if len(tables) > 0 && len(tableDB) > 0 {
  168. subs := strings.Split(tables, ",")
  169. for i := len(subs) - 1; i >= 0; i-- {
  170. var where []string
  171. where = append(where, " 1 = 1")
  172. if strings.Trim(nodename, " ") != "" {
  173. where = append(where, nodename+" = "+"'"+nodevalue+"'")
  174. }
  175. sqls := fmt.Sprintf(Delete, subs[i], strings.Join(where, " and "))
  176. //insertlog(sqls)
  177. //savesqltofile(sqls)
  178. err := DqueueIndex.PushOneIndex([]byte(sqls))
  179. if err != nil{
  180. fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:",err)
  181. }
  182. }
  183. //for _, value := range subs {
  184. //}
  185. }
  186. }
  187. //func () {
  188. // for i:= 0 ; i<100000000;i++ {
  189. // select {
  190. // case <-c2 :
  191. // fmt.Println("c2退出")
  192. // return
  193. // default:
  194. // err =dqueuePop.PushOneIndex(
  195. // []byte("a"+strconv.Itoa(i)),
  196. // )
  197. // if err != nil{
  198. // fmt.Println("dqueuePop.PushOneIndex err",err)
  199. //
  200. // }else {
  201. // fmt.Println("dqueuePop.PushOneIndex sucess","a"+strconv.Itoa(i))
  202. // }
  203. // //time.Sleep(1*time.Millisecond)
  204. // }
  205. // //err =dqueue.Push([][]byte{
  206. // // []byte("a"+strconv.Itoa(i)),
  207. // // []byte("b"+strconv.Itoa(i)),
  208. // //})
  209. // }
  210. //}
  211. func appendinsertsql(sqlstr string) {
  212. if insertcount == 0 {
  213. insertstmt = "insert into tablesqllog(`sql`) values(\"" + sqlstr + "\")"
  214. } else {
  215. insertstmt = insertstmt + ",(\"" + sqlstr + "\")"
  216. }
  217. insertcount++
  218. }
  219. func insertlog(sqlstr string) {
  220. var err error
  221. if mydb == nil {
  222. connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", user, password, host, port, "sqllog")
  223. mydb, err = sql.Open("mysql", connecting)
  224. if err != nil {
  225. fmt.Println(" sql.Open err", err)
  226. }
  227. mydb.SetMaxOpenConns(100)
  228. mydb.SetMaxIdleConns(10)
  229. mydb.Ping()
  230. }
  231. if Exectx == nil {
  232. Exectx, err = mydb.Begin()
  233. if err != nil {
  234. fmt.Println(err)
  235. }
  236. }
  237. if Exectx != nil {
  238. _, err = Exectx.Exec("insert into tablesqllog(`sql`) values(?);", sqlstr)
  239. if err != nil {
  240. fmt.Println("Exectx.Exec(", err)
  241. } else {
  242. insertcount++
  243. if time.Since(insertlog_lasttime) > 60*time.Second {
  244. err := Exectx.Commit()
  245. Exectx = nil
  246. if err != nil {
  247. fmt.Println("Exectx.Commit():", err)
  248. }
  249. insertlog_lasttime = time.Now() // get current time
  250. } else {
  251. if insertcount > 10000 {
  252. err := Exectx.Commit()
  253. Exectx = nil
  254. if err != nil {
  255. fmt.Println("Exectx.Commit()insertcount:", err)
  256. }
  257. insertlog_lasttime = time.Now() // get current time
  258. insertcount = 0
  259. }
  260. }
  261. }
  262. }
  263. }
  264. func Exec(mdb *sql.DB, _exsqlstr string) error {
  265. if mdb != nil {
  266. _, err := mdb.Exec(_exsqlstr)
  267. if err != nil {
  268. return err
  269. }
  270. }
  271. return nil
  272. }
  273. func ExecT(mdb *sql.Tx, _exsqlstr string) error {
  274. if mdb != nil {
  275. _, err := mdb.Exec(_exsqlstr)
  276. if err != nil {
  277. return err
  278. }
  279. }
  280. return nil
  281. }
  282. func Get(mdb *sql.DB) (int, string) {
  283. var id int
  284. var sql string
  285. sql = ""
  286. if mdb != nil {
  287. //参数绑定,可以避免sql注入
  288. rows := mdb.QueryRow("select `id`,`sql` from `tablesqllog` order by id limit 1")
  289. err := rows.Scan(&id, &sql)
  290. if err != nil {
  291. return 0, ""
  292. }
  293. }
  294. return id, sql
  295. }
  296. func delete(mdb *sql.DB, id int) error {
  297. if deletestmt == nil {
  298. deletestmt, _ = mdb.Prepare("delete from `tablesqllog` where id <= ? ")
  299. }
  300. if deletestmt != nil {
  301. _, err := deletestmt.Exec(id)
  302. if err != nil {
  303. return err
  304. }
  305. }
  306. return nil
  307. }
  308. func GetDML(e *canal.RowsEvent) string {
  309. query := ""
  310. if e.Action == "insert" {
  311. Columns := GetColumns(e)
  312. var rows []string
  313. for _, value := range e.Rows {
  314. var frows []string
  315. for _, fvalue := range value {
  316. switch v := fvalue.(type) {
  317. case string:
  318. frows = append(frows, "'"+v+"'")
  319. case int8, int16, int32, int, int64:
  320. strV := fmt.Sprintf("%d", v)
  321. frows = append(frows, strV)
  322. case decimal.Decimal:
  323. frows = append(frows, v.String())
  324. case float32:
  325. frows = append(frows, strconv.FormatFloat(float64(v), 'f', -1, 32))
  326. case float64:
  327. frows = append(frows, strconv.FormatFloat(v, 'f', -1, 64))
  328. default:
  329. frows = append(frows, "NULL")
  330. }
  331. }
  332. if strings.Trim(nodename, " ") != "" {
  333. frows = append(frows, "'"+nodevalue+"'")
  334. }
  335. rows = append(rows, "("+strings.Join(frows, ",")+")")
  336. }
  337. query = fmt.Sprintf(Insert, e.Table.Name, strings.Join(Columns, ","), strings.Join(rows, ","))
  338. } else if e.Action == "update" {
  339. var rows []string
  340. var where []string
  341. var uset []string
  342. for i, value := range e.Rows {
  343. if (i % 2) == 0 {
  344. where = append([]string{})
  345. for _, pk := range e.Table.PKColumns {
  346. switch v := value[pk].(type) {
  347. case string:
  348. where = append(where, "`"+e.Table.Columns[pk].Name+"` = '"+v+"'")
  349. case int8, int16, int32, int, int64:
  350. strV := fmt.Sprintf("%d", v)
  351. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strV)
  352. case decimal.Decimal:
  353. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+v.String())
  354. case float32:
  355. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
  356. case float64:
  357. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
  358. default:
  359. where = append(where, "`"+e.Table.Columns[pk].Name+"` is NULL")
  360. }
  361. }
  362. if strings.Trim(nodename, " ") != "" {
  363. where = append(where, nodename+" = "+"'"+nodevalue+"'")
  364. }
  365. } else {
  366. uset = append([]string{})
  367. for j, _ := range e.Table.Columns {
  368. xx := reflect.DeepEqual(e.Rows[i-1][j], value[j])
  369. if !xx {
  370. switch v := value[j].(type) {
  371. case string:
  372. uset = append(uset, "`"+e.Table.Columns[j].Name+"` = '"+v+"'")
  373. case int8, int16, int32, int, int64:
  374. strV := fmt.Sprintf("%d", v)
  375. uset = append(uset, "`"+e.Table.Columns[j].Name+"` = "+strV)
  376. case decimal.Decimal:
  377. uset = append(uset, "`"+e.Table.Columns[j].Name+"` = "+v.String())
  378. case float32:
  379. uset = append(uset, "`"+e.Table.Columns[j].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
  380. case float64:
  381. uset = append(uset, "`"+e.Table.Columns[j].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
  382. default:
  383. uset = append(uset, "`"+e.Table.Columns[j].Name+"` = NULL")
  384. }
  385. }
  386. }
  387. rows = append(rows, fmt.Sprintf(Update, e.Table.Name, strings.Join(uset, ", "), strings.Join(where, " and ")))
  388. }
  389. }
  390. query = strings.Join(rows, "\n")
  391. } else if e.Action == "delete" {
  392. var rows []string
  393. for _, value := range e.Rows {
  394. var where []string
  395. for _, pk := range e.Table.PKColumns {
  396. switch v := value[pk].(type) {
  397. case string:
  398. where = append(where, "`"+e.Table.Columns[pk].Name+"` = '"+v+"'")
  399. case int8, int16, int32, int, int64:
  400. strV := fmt.Sprintf("%d", v)
  401. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strV)
  402. case float32:
  403. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(float64(v), 'f', -1, 32))
  404. case float64:
  405. where = append(where, "`"+e.Table.Columns[pk].Name+"` = "+strconv.FormatFloat(v, 'f', -1, 64))
  406. default:
  407. where = append(where, "`"+e.Table.Columns[pk].Name+"` is NULL")
  408. }
  409. }
  410. if strings.Trim(nodename, " ") != "" {
  411. where = append(where, nodename+" = "+"'"+nodevalue+"'")
  412. }
  413. rows = append(rows, fmt.Sprintf(Delete, e.Table.Name, strings.Join(where, " and ")))
  414. }
  415. query = strings.Join(rows, "\n")
  416. }
  417. return query
  418. }
  419. type handler struct {
  420. canal.DummyEventHandler
  421. }
  422. func (h *handler) OnRow(e *canal.RowsEvent) error {
  423. needsave := 0
  424. if len(tables) > 0 && len(tableDB) > 0 {
  425. subs := strings.Split(tables, ",")
  426. for _, value := range subs {
  427. if strings.ToLower(e.Table.Name) == strings.ToLower(value) {
  428. needsave = 1
  429. break
  430. }
  431. }
  432. } else {
  433. needsave = 1
  434. }
  435. if needsave > 0 {
  436. s := GetDML(e)
  437. if len(s) > 0 {
  438. if showlog > 0 {
  439. fmt.Println(s)
  440. }
  441. //insertlog(s)
  442. //sqlDeque.PushBack(s)
  443. //mysqlQueue.Enqueue(s)
  444. //WriteChWithSelect(mysqlch, s)
  445. //savesqltofile(s)
  446. err := DqueueIndex.PushOneIndex([]byte(s))
  447. if err != nil{
  448. fmt.Println("DqueueIndex.PushOneIndex([]byte(sqls)) err:",err)
  449. }
  450. }
  451. }
  452. return nil
  453. }
  454. func (h *handler) OnRotate(roateEvent *replication.RotateEvent) error {
  455. //writePosini(string(roateEvent.NextLogName), uint32(roateEvent.Position))
  456. setini(string(roateEvent.NextLogName), uint32(roateEvent.Position))
  457. return nil
  458. }
  459. func (h *handler) OnPosSynced(p mysql.Position, G mysql.GTIDSet, g bool) error {
  460. // fmt.Println(p.Name + " | " + strconv.Itoa(int(p.Pos)))
  461. endName = p.Name
  462. endPos = p.Pos
  463. //writePosini(p.Name, p.Pos)
  464. setini(p.Name, p.Pos)
  465. return nil
  466. }
  467. func (h *handler) String() string {
  468. return "TestHandler"
  469. }
  470. func Readini() {
  471. CurrentPath, _ = GetCurrentPath()
  472. h, _ := log.NewRotatingFileHandler(CurrentPath+"Consumer.log", 40*1024*1024, 10)
  473. logger = log.NewDefault(h)
  474. logger.SetLevelByName("info")
  475. inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
  476. if err != nil {
  477. logger.Errorln("读取配置文件失败1[config.ini]",err.Error())
  478. return
  479. }
  480. PosChan = make(chan Posname,10000)
  481. go runsetini()
  482. host = inicfg.MustValue("canal", "host", "127.0.0.1")
  483. port = inicfg.MustInt("canal", "port", 3306)
  484. user = inicfg.MustValue("canal", "user", "root")
  485. password = inicfg.MustValue("canal", "password", "root")
  486. flavor = inicfg.MustValue("canal", "flavor", "mariadb")
  487. serverID = inicfg.MustInt("canal", "serverID", 101)
  488. mysqldump = inicfg.MustValue("canal", "mysqldump", "mysqldump")
  489. tables = inicfg.MustValue("canal", "tables", "")
  490. tableDB = inicfg.MustValue("canal", "tableDB", "")
  491. ignoreTables = inicfg.MustValue("canal", "ignoreTables", "")
  492. startName = inicfg.MustValue("canal", "startName", "")
  493. startPos = uint(inicfg.MustInt("canal", "startPos", 0))
  494. endName = startName
  495. endPos = uint32(startPos)
  496. heartbeatPeriod = inicfg.MustInt("canal", "heartbeatPeriod", 60)
  497. readTimeout = inicfg.MustInt("canal", "readTimeout", 90)
  498. nodename = inicfg.MustValue("canal", "nodename", "")
  499. nodevalue = inicfg.MustValue("canal", "nodevalue", "1")
  500. showlog = inicfg.MustInt("canal", "showlog", 1)
  501. onetimerows = inicfg.MustValue("canal", "onetimerows", "1000")
  502. KafkaEnable = inicfg.MustBool("kafka", "kafkaEnable", false)
  503. kafka_host = inicfg.MustValue("kafka", "kafka_host", "127.0.0.1")
  504. kafka_port = inicfg.MustInt("kafka", "kafka_port", 9092)
  505. kafka_topic = inicfg.MustValue("kafka", "kafka_topic", "kafka_go_test")
  506. saslEnable = inicfg.MustBool("kafka", "saslEnable", false)
  507. username = inicfg.MustValue("kafka", "username", "root")
  508. saslpassword = inicfg.MustValue("kafka", "saslpassword", "root")
  509. tlsEnable = inicfg.MustBool("kafka", "tlsEnable", false)
  510. clientcert = inicfg.MustValue("kafka", "clientcert", "")
  511. clientkey = inicfg.MustValue("kafka", "clientkey", "")
  512. cacert = inicfg.MustValue("kafka", "cacert", "")
  513. MqttEnable = inicfg.MustBool("mqtt", "mqttEnable", false)
  514. KptCattleId = inicfg.MustValue("mqtt", "kptCattleId", "888888")
  515. mqtt_nodevalue = nodevalue
  516. mqtt_host = inicfg.MustValue("mqtt", "host", "127.0.0.1")
  517. mqtt_port = inicfg.MustInt("mqtt", "port", 9092)
  518. mqtt_path = inicfg.MustValue("mqtt", "path", "")
  519. mqtt_qos = inicfg.MustInt("mqtt", "qos", 0)
  520. mqtt_topic = inicfg.MustValue("mqtt", "topic", "mqtt_topic")
  521. mqtt_saslEnable = inicfg.MustBool("mqtt", "saslEnable", false)
  522. mqtt_username = inicfg.MustValue("mqtt", "username", "127.0.0.1")
  523. mqtt_saslpassword = inicfg.MustValue("mqtt", "saslpassword", "")
  524. mqtt_tlsEnable = inicfg.MustBool("mqtt", "tlsEnable", false)
  525. mqtt_clientcert = inicfg.MustValue("mqtt", "clientcert", "")
  526. mqtt_clientkey = inicfg.MustValue("mqtt", "clientkey", "")
  527. mqtt_cacert = inicfg.MustValue("mqtt", "cacert", "")
  528. mqtt_CleanSession = inicfg.MustBool("mqtt", "CleanSession", true)
  529. ServiceName = inicfg.MustValue("Service", "ServiceName", "KPTDataService")
  530. ServiceDisplayName = inicfg.MustValue("Service", "ServiceDisplayName", "KPTDataService")
  531. ServiceDescription = inicfg.MustValue("Service", "ServiceDescription", "科湃腾数据同步")
  532. openini()
  533. file := CurrentPath +"/pos/pos.db"
  534. defer slowpoke.Close(CurrentPath +"pos/pos.db")
  535. keyN := []byte("startName")
  536. keyP := []byte("startPos")
  537. resN, err := slowpoke.Get(file, keyN)
  538. if err != nil || string(resN) == "" {
  539. logger.Error(" slowpoke.Get startName err",err," posname :",resN)
  540. }else{
  541. res, err := slowpoke.Get(file, keyP)
  542. if err != nil {
  543. logger.Error(" slowpoke.Get startPos err",err)
  544. }else{
  545. str,err := strconv.Atoi(string(res))
  546. if err != nil || str == 0 {
  547. logger.Error("strconv.Atoi err",err," pos :",str)
  548. }else {
  549. startName = string(resN)
  550. startPos = uint(str)
  551. }
  552. logger.Info("read pos name ", startPos, startName)
  553. }
  554. }
  555. logger.Info("end pos name", startPos, startName)
  556. }
  557. func initPos(_user, _password, _host,_dbs string, _port int)(int,string,error){
  558. connecting := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "mysql")
  559. var err error
  560. posdb, err = sql.Open("mysql", connecting)
  561. if err == nil {
  562. posdb.SetMaxOpenConns(10)
  563. posdb.SetMaxIdleConns(10)
  564. } else {
  565. return 0,"",err
  566. }
  567. _, err = posdb.Exec("CREATE DATABASE IF NOT EXISTS `sqllog`")
  568. posdb.Close()
  569. connecting = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", _user, _password, _host, _port, "sqllog")
  570. posdb, err = sql.Open("mysql", connecting)
  571. if err == nil {
  572. posdb.SetMaxOpenConns(10)
  573. posdb.SetMaxIdleConns(10)
  574. } else {
  575. return 0,"",err
  576. }
  577. _, _ = posdb.Exec("CREATE TABLE IF NOT EXISTS `mypos` (`id` INT(11) NOT NULL ,`pos` INT(11) DEFAULT 0,`file` VARCHAR(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=INNODB DEFAULT CHARSET=utf8;")
  578. rows := posdb.QueryRow("select pos,file from mypos where id = 0 ")
  579. pos := 0
  580. file := ""
  581. if err := rows.Scan(&pos, &file); err != nil {
  582. _,err = posdb.Exec("insert mypos(id,pos,file) value (0,0,'')")
  583. }
  584. return pos,file,err
  585. }
  586. func openini() {
  587. CurrentPath, _ = GetCurrentPath()
  588. var err error
  589. inicfg, err = goconfig.LoadConfigFile(CurrentPath + "config.ini")
  590. if err != nil {
  591. logger.Errorln("读取配置文件失败[config.ini]",err.Error())
  592. return
  593. }
  594. }
  595. func setini(name string, pos uint32) {
  596. posname := Posname{name,pos}
  597. PosChan <- posname
  598. }
  599. func runsetini() {
  600. for{
  601. posname := <-PosChan
  602. logger.Info("binlog pos :",posname.pos," binlog posname :",posname.name)
  603. CurrentPath, _ = GetCurrentPath()
  604. inicfg.SetValue("canal", "startName", posname.name)
  605. inicfg.SetValue("canal", "startPos", strconv.Itoa(int(posname.pos)))
  606. file := CurrentPath +"pos/pos.db"
  607. err :=slowpoke.Set(file, []byte("startName"), []byte(posname.name))
  608. if err != nil {
  609. logger.Error(" slowpoke.Set startName err",err)
  610. }
  611. err = slowpoke.Set(file, []byte("startPos"),[]byte(strconv.Itoa(int(posname.pos))))
  612. if err != nil {
  613. logger.Error(" slowpoke.Set startPos err",err)
  614. }
  615. err = slowpoke.Close(CurrentPath +"pos/pos.db")
  616. if err != nil {
  617. logger.Error(" slowpoke.Close err",err)
  618. }
  619. }
  620. }
  621. //func saveini() {
  622. //
  623. //
  624. //}
  625. func writeini(name string, pos uint32) {
  626. CurrentPath, _ = GetCurrentPath()
  627. inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
  628. if err != nil {
  629. logger.Infoln("读取配置文件失败[config.ini]")
  630. return
  631. }
  632. inicfg.SetValue("canal", "startName", name)
  633. inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
  634. err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini")
  635. if err != nil {
  636. logger.Infoln("保存配置文件失败[config.ini]")
  637. return
  638. }
  639. }
  640. func writeini1(name string, pos uint32) {
  641. CurrentPath, _ = GetCurrentPath()
  642. inicfg, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
  643. if err != nil {
  644. logger.Infoln("读取配置文件失败[config.ini]")
  645. return
  646. }
  647. inicfg.SetValue("canal", "startName", name)
  648. inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
  649. err = goconfig.SaveConfigFile(inicfg, CurrentPath+"config.ini")
  650. if err != nil {
  651. logger.Infoln("保存配置文件失败[config.ini]")
  652. return
  653. }
  654. }
  655. func writePosini(name string, pos uint32) {
  656. println("pos",pos)
  657. tx,_ := posdb.Begin()
  658. _,err :=tx.Exec("update mypos set pos = ? , file = ? where id = 0",pos,name)
  659. if err != nil{
  660. tx.Rollback()
  661. }else {
  662. tx.Commit()
  663. }
  664. //CurrentPath, _ = GetCurrentPath()
  665. //inicfg, err := goconfig.LoadConfigFile(CurrentPath + "configPos.ini")
  666. //if err != nil {
  667. // logger.Errorln("读取配置文件失败[configPos.ini]")
  668. // return
  669. //}
  670. //inicfg.SetValue("canal", "startName", name)
  671. //inicfg.SetValue("canal", "startPos", strconv.Itoa(int(pos)))
  672. //err = goconfig.SaveConfigFile(inicfg, CurrentPath+"configPos.ini")
  673. //if err != nil {
  674. // logger.Errorln("保存配置文件失败[configPos.ini]")
  675. // return
  676. //}
  677. }
  678. func IntToBytes(n int) []byte {
  679. data := int64(n)
  680. bytebuf := bytes.NewBuffer([]byte{})
  681. binary.Write(bytebuf, binary.BigEndian, data)
  682. return bytebuf.Bytes()
  683. }
  684. func BytesToInt(bys []byte) int {
  685. bytebuff := bytes.NewBuffer(bys)
  686. var data int64
  687. binary.Read(bytebuff, binary.BigEndian, &data)
  688. return int(data)
  689. }
  690. func RunService() {
  691. //Readini()
  692. var err error
  693. DqueueIndex,err = OpenIndexFile(CurrentPath+ "logs",100000,10000,false)
  694. if err != nil {
  695. fmt.Printf("create DqueueIndex err %v", err)
  696. os.Exit(1)
  697. }
  698. cfg := canal.NewDefaultConfig()
  699. cfg.Addr = fmt.Sprintf("%s:%d", host, port)
  700. cfg.User = user
  701. cfg.Password = password
  702. cfg.Flavor = flavor
  703. cfg.UseDecimal = true
  704. cfg.ReadTimeout = time.Duration(readTimeout) * time.Second
  705. cfg.HeartbeatPeriod = time.Duration(heartbeatPeriod) * time.Second
  706. cfg.ServerID = uint32(serverID)
  707. cfg.Dump.ExecutionPath = CurrentPath + mysqldump
  708. cfg.Dump.DiscardErr = false
  709. cfg.Dump.MaxAllowedPacketMB = 500
  710. //cfg.Dump.TableDB = "tmrwatch"
  711. c, err := canal.NewCanal(cfg)
  712. if err != nil {
  713. fmt.Printf("create canal err %v", err)
  714. os.Exit(1)
  715. }
  716. if len(ignoreTables) > 0 {
  717. subs := strings.Split(ignoreTables, ",")
  718. for _, sub := range subs {
  719. if seps := strings.Split(sub, "."); len(seps) == 2 {
  720. c.AddDumpIgnoreTables(seps[0], seps[1])
  721. }
  722. }
  723. }
  724. if len(tables) > 0 && len(tableDB) > 0 {
  725. subs := strings.Split(tables, ",")
  726. c.AddDumpTables(tableDB, subs...)
  727. } else if len(tableDB) > 0 {
  728. subs := strings.Split(tableDB, ",")
  729. c.AddDumpDatabases(subs...)
  730. }
  731. c.SetEventHandler(&handler{})
  732. startPos_ := mysql.Position{
  733. Name: startName,
  734. Pos: uint32(startPos),
  735. }
  736. ctx := context.Background()
  737. ctx, cancel := context.WithCancel(ctx)
  738. go func() {
  739. if !(startPos > 0) {
  740. initDB()
  741. //SqlBitcask.DeleteAll()
  742. //SqlBitcask.Merge()
  743. insertEmpty()
  744. }
  745. err = c.RunFrom(startPos_)
  746. if err != nil { //出错后,将重头开始上传
  747. fmt.Printf("start canal err %v", err)
  748. logger.Error("start canal err %v", err)
  749. c.Close()
  750. c, err := canal.NewCanal(cfg)
  751. if err != nil {
  752. fmt.Printf("create canal err %v", err)
  753. os.Exit(1)
  754. }
  755. if len(ignoreTables) > 0 {
  756. subs := strings.Split(ignoreTables, ",")
  757. for _, sub := range subs {
  758. if seps := strings.Split(sub, "."); len(seps) == 2 {
  759. c.AddDumpIgnoreTables(seps[0], seps[1])
  760. }
  761. }
  762. }
  763. if len(tables) > 0 && len(tableDB) > 0 {
  764. subs := strings.Split(tables, ",")
  765. c.AddDumpTables(tableDB, subs...)
  766. } else if len(tableDB) > 0 {
  767. subs := strings.Split(tableDB, ",")
  768. c.AddDumpDatabases(subs...)
  769. }
  770. c.SetEventHandler(&handler{})
  771. ctx := context.Background()
  772. ctx, cancel = context.WithCancel(ctx)
  773. startPos_ = mysql.Position{
  774. Name: "",
  775. Pos: 0,
  776. }
  777. initDB()
  778. insertEmpty()
  779. err = c.RunFrom(startPos_)
  780. logger.Error("start canal err second %v", err)
  781. }
  782. }()
  783. //监听
  784. //ctxListen := context.Background()
  785. //ctxListen, cancelListen := context.WithCancel(ctxListen)
  786. //go func() {
  787. // ticker := time.Tick(60 * time.Second)
  788. // for {
  789. // select {
  790. // case <-ticker:
  791. // if time.Since(insertlog_lasttime) > 60*time.Second && Exectx != nil {
  792. //
  793. // err := Exectx.Commit()
  794. // Exectx = nil
  795. // if err != nil {
  796. // fmt.Println("Exectx.Commit()ticker err:", err)
  797. // }
  798. // insertlog_lasttime = time.Now()
  799. // }
  800. // case <-ctxListen.Done():
  801. // if Exectx != nil {
  802. //
  803. // err := Exectx.Commit()
  804. // Exectx = nil
  805. // if err != nil {
  806. // fmt.Println("Exectx.Commit()ticker err:", err)
  807. // }
  808. // insertlog_lasttime = time.Now()
  809. // fmt.Println("Exectx.Commit()tickerEXIT:", err)
  810. // return
  811. // }
  812. //
  813. // }
  814. // }
  815. //
  816. //}()
  817. if KafkaEnable {
  818. go Kafka_producerDB(ctx)
  819. }else if MqttEnable {
  820. go Mqtt_producerDB(ctx)
  821. }
  822. Ch = make(chan int, 1)
  823. <-Ch
  824. //cancelListen()
  825. cancel()
  826. DqueueIndex.CloseIndex()
  827. //setini(endName,endPos)
  828. //writeini(endName,endPos)
  829. fmt.Println("程序被关闭 ")
  830. c.Close()
  831. }