123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682 |
- package kpt
- import (
- "context"
- "crypto/tls"
- "flag"
- "fmt"
- "log"
- "os"
- "strconv"
- "strings"
- "time"
- MQTT "github.com/eclipse/paho.mqtt.golang"
- "github.com/go-mysql-org/go-mysql/canal"
- "github.com/go-mysql-org/go-mysql/mysql"
- "github.com/golang/snappy"
- "github.com/vmihailenco/msgpack/v5"
- )
- var (
- mqtt_host string
- mqtt_port int
- mqtt_path string
- mqtt_topic string
- mqtt_qos int
- mqtt_saslEnable bool
- mqtt_username string
- mqtt_saslpassword string
- mqtt_tlsEnable bool
- mqtt_clientcert string
- mqtt_clientkey string
- mqtt_cacert string
- mqtt_nodevalue string
- mqtt_CleanSession bool
- mqtt_Breaki int
- mqtt_onetimerows string
- temp_i int = 0
- )
- type SqlItem struct {
- Sql string
- }
- type TransferItem struct {
- Ftype string
- Content []byte
- }
- func SqlItemBuilder() interface{} {
- return &SqlItem{}
- }
- func reconnect(connOpts MQTT.Client) {
- for {
- fmt.Println("尝试重新连接...")
- if token := connOpts.Connect(); token.Wait() && token.Error() == nil {
- fmt.Println("重新连接成功")
- return
- }
- time.Sleep(5 * time.Second) // 等待5秒后再次尝试
- }
- }
- func clearPos() {
- c, err := canal.NewCanal(cfg)
- if err != nil {
- fmt.Printf("create canal err %v", err)
- os.Exit(1)
- }
- if len(ignoreTables) > 0 {
- subs := strings.Split(ignoreTables, ",")
- for _, sub := range subs {
- if seps := strings.Split(sub, "."); len(seps) == 2 {
- c.AddDumpIgnoreTables(seps[0], seps[1])
- }
- }
- }
- if len(tables) > 0 && len(tableDB) > 0 {
- subs := strings.Split(tables, ",")
- c.AddDumpTables(tableDB, subs...)
- } else if len(tableDB) > 0 {
- subs := strings.Split(tableDB, ",")
- c.AddDumpDatabases(subs...)
- }
- c.SetEventHandler(&handler{})
- // ctx := context.Background()
- // ctx, cancel = context.WithCancel(ctx)
- startPos_ := mysql.Position{
- Name: "",
- Pos: 0,
- }
- // initDB()
- insertEmpty()
- err = c.RunFrom(startPos_)
- logger.Error("start canal err second %v", err)
- }
- func onMessageReceived(client MQTT.Client, message MQTT.Message) {
- err := DqueueIndexMqtt.PushOneIndex(message.Payload()[:])
- if err != nil {
- logger.Errorln("DqueueIndexMqtt.PushOneIndex,err", err)
- }
- }
- func Mqtt_producerDB(ctx context.Context) {
- fmt.Println("Mqtt Clint 启动")
- retained := flag.Bool("retained", false, "Are the messages sent with the retained flag")
- connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves.
- connOpts.SetClientID(mqtt_nodevalue)
- connOpts.SetMaxReconnectInterval(1 * time.Second)
- connOpts.SetCleanSession(mqtt_CleanSession)
- connOpts.AutoReconnect = true
- brokerURL := fmt.Sprintf("tcp://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
- int_, err := strconv.Atoi(onetimerows)
- if err == nil {
- } else {
- logger.Errorln("转换onetimerowsMqtt错误 \r\n", err.Error())
- }
- if mqtt_saslEnable {
- if mqtt_username != "" {
- connOpts.SetUsername(mqtt_username)
- if mqtt_saslpassword != "" {
- connOpts.SetPassword(mqtt_saslpassword)
- }
- }
- }
- if mqtt_tlsEnable {
- cer, err := tls.LoadX509KeyPair(mqtt_clientcert, mqtt_clientkey)
- if err != nil {
- println("LoadX509KeyPair err :", err.Error())
- }
- connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
- brokerURL = fmt.Sprintf("tcps://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
- }
- connOpts.OnConnect = func(c MQTT.Client) {
- //if token := c.Subscribe(mqtt_topic, byte(mqtt_qos), onMessageReceived); token.Wait() && token.Error() != nil {
- // panic(token.Error())
- //}
- }
- connOpts.OnConnectionLost = func(c MQTT.Client, reason error) {
- println("onlost reason", reason.Error())
- clearPos()
- reconnect(c)
- }
- connOpts.AddBroker(brokerURL)
- //connOpts.OnConnect = func(c MQTT.Client) {
- // if token := c.Subscribe(mqtt_topic, byte(mqtt_qos), onMessageReceived); token.Wait() && token.Error() != nil {
- // //panic(token.Error())
- // fmt.Println("panic(token.Error())",token.Error())
- // }else{
- // //fmt.Println("",token,token.Error(),mqtt_topic)
- // }
- //
- //}
- // var reconnectHandler mqtt.ReconnectHandler = func(client mqtt.Client, opts *mqtt.ClientOptions) {
- // fmt.Println("Reconnecting to MQTT broker...")
- // if token := client.Connect(); token.Wait() && token.Error() != nil {
- // log.Fatalf("Failed to reconnect: %v", token.Error())
- // }
- // }
- // connOpts.OnReconnecting = reconnectHandler
- mqttClient := MQTT.NewClient(connOpts)
- if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- logger.Errorln("[MQTT] Connected")
- for {
- select {
- case <-ctx.Done():
- fmt.Println("mqtt 退出")
- return
- default:
- PopPeeks, err, full, null := DqueueIndex.PopPeeksV2(int_)
- if err != nil {
- fmt.Println("PopPeeks,err ", err)
- } else {
- if len(PopPeeks) == 0 {
- if !full && null {
- DqueueIndex.PopOneIndex()
- }
- time.Sleep(time.Second)
- continue
- }
- //fmt.Println("PopPeeks==",string(PopPeeks[0]))
- datasSlice := make([]TransferItem, len(PopPeeks))
- for key, value := range PopPeeks {
- //println("resql =========",string(value))
- datasSlice[key].Content = value
- }
- b1, err := msgpack.Marshal(&datasSlice)
- if err != nil {
- panic(err)
- }
- //fmt.Println("encode", len(b1))
- encode := snappy.Encode(nil, b1)
- logger.Infoln("encode", len(encode))
- //println("encode",len(encode))
- token := mqttClient.Publish(mqtt_topic, byte(mqtt_qos), *retained, encode)
- //if token.Wait() && token.Error() != nil {
- // fmt.Println("=============",token.Error())
- //}
- if token.Wait() && token.Error() != nil {
- if showlog > 0 {
- fmt.Println(" mqttClient.Publish err:", mqtt_topic, string(PopPeeks[0]), err)
- }
- logger.Errorln("Client.Publish err:--- ", token.Error())
- } else {
- temp_i++
- //fmt.Println(temp_i)
- logger.Infoln("Client.Publish len:--- ", len(encode))
- if !full && null {
- for i := 0; i < len(PopPeeks)+1; i++ {
- _, _ = DqueueIndex.PopOneIndex()
- if showlog > 0 {
- //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1", string(pop), err)
- }
- }
- } else {
- for i := 0; i < len(PopPeeks); i++ {
- _, _ = DqueueIndex.PopOneIndex()
- if showlog > 0 {
- //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1", string(pop), err)
- }
- }
- }
- }
- }
- }
- }
- }
- func Mq_Consumer(ctx context.Context) {
- var err error
- DqueueIndexMqtt, err = OpenIndexFile(CurrentPath+"logs", 100000, 10000, false)
- if err != nil {
- fmt.Printf("create DqueueIndex err %v", err)
- os.Exit(1)
- }
- fmt.Println("Mqtt Clint 启动")
- connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves.
- connOpts.SetClientID(mqtt_nodevalue)
- connOpts.SetMaxReconnectInterval(1 * time.Second)
- connOpts.SetCleanSession(mqtt_CleanSession)
- connOpts.AutoReconnect = true
- brokerURL := fmt.Sprintf("tcp://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
- if mqtt_saslEnable {
- if mqtt_username != "" {
- connOpts.SetUsername(mqtt_username)
- if mqtt_saslpassword != "" {
- connOpts.SetPassword(mqtt_saslpassword)
- }
- }
- }
- if mqtt_tlsEnable {
- cer, err := tls.LoadX509KeyPair(mqtt_clientcert, mqtt_clientkey)
- check(err)
- connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
- brokerURL = fmt.Sprintf("tcps://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
- }
- connOpts.AddBroker(brokerURL)
- connOpts.OnConnect = func(c MQTT.Client) {
- if token := c.Subscribe(mqtt_topic, byte(mqtt_qos), onMessageReceived); token.Wait() && token.Error() != nil {
- fmt.Println("panic(token.Error())", token.Error())
- }
- }
- label:
- mqttClient := MQTT.NewClient(connOpts)
- if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
- if showlog > 0 {
- fmt.Println("token.Error(),err", token.Error())
- }
- logger.Errorln("token.Error(),err", token.Error())
- time.Sleep(time.Second)
- goto label
- //panic(token.Error())
- }
- log.Println("[MQTT] Connected")
- go func() {
- for {
- select {
- case <-ctx.Done():
- fmt.Println("mqtt 退出")
- logger.Errorln("mqtt 退出")
- return
- default:
- PopPeeks, err, full, null := DqueueIndexMqtt.PopPeeksV2(1)
- fmt.Println("DqueueIndexMqtt.PopPeeksV2(1)", len(PopPeeks))
- if err != nil {
- logger.Errorln("PopPeeks,err ", err)
- } else {
- if len(PopPeeks) == 0 {
- if !full && null {
- DqueueIndexMqtt.PopOneIndex()
- }
- if showlog > 0 {
- fmt.Println("PopPeeks,null,", PopPeeks)
- }
- time.Sleep(time.Second)
- continue
- }
- // var pop []byte
- labelDb:
- if mqttDb == nil {
- mqttDb, err = GetmyDbsConnect(user, password, host, port, tableDB)
- if err != nil {
- logger.Errorln("GetmyDbsConnect err:", err)
- time.Sleep(time.Second)
- goto labelDb
- }
- }
- //fmt.Println("mqttDb.Begin()")
- tx, err := mqttDb.Begin()
- if err == nil {
- //fmt.Println("tx, err := mqttDb.Begin() err", err)
- decode, _ := snappy.Decode(nil, PopPeeks[0])
- fmt.Println("encode", string(PopPeeks[0]))
- var item1 []TransferItem
- err = msgpack.Unmarshal(decode, &item1)
- if err != nil {
- log.Println("Unmarshal panic(err)", err)
- }
- // fmt.Println(item1)
- count := 0
- str := ""
- errstr := ""
- for _, value := range item1 {
- sqls := strings.Split(string(value.Content), ";")
- for _, sql := range sqls {
- //err = ExecT(tx, string(value.Content))
- if sql == "" {
- continue
- }
- sql = strings.Replace(sql, "\\", "\\\\", -1)
- if strings.Index(sql, "downloadedplan") > 0 {
- fmt.Println(sql)
- }
- logger.Infoln(sql, "test")
- err = ExecT(tx, sql)
- // db, err := tx.Exec(sql)
- //fmt.Println("string(value.Content)",string(value.Content))
- if err == nil {
- //logger.Infoln(fmt.Sprintf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), sqlline))
- //fmt.Println( "ExecT---" + string(value.Content))
- } else {
- logger.Errorln(err.Error() + "---" + sql)
- count++
- str = string(value.Content)
- errstr = err.Error()
- //fmt.Println(err.Error() + "---" + string(value.Content))
- }
- }
- }
- if count > 0 {
- count = 0
- logger.Errorln(errstr + "---" + str)
- }
- if showlog > 0 {
- log.Println("PopPeeks, %d \n\t", len(item1))
- }
- } else {
- if showlog > 0 {
- log.Println("mqttDb.Begin(),err", err)
- }
- }
- err = tx.Commit()
- if err == nil {
- if !full && null {
- for i := 0; i < len(PopPeeks)+1; i++ {
- _, _ = DqueueIndexMqtt.PopOneIndex()
- if showlog > 0 {
- //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1", string(pop), err)
- }
- }
- } else {
- for i := 0; i < len(PopPeeks); i++ {
- _, _ = DqueueIndexMqtt.PopOneIndex()
- if showlog > 0 {
- //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()2", string(pop), err)
- }
- }
- }
- } else {
- logger.Errorln("tx.Commit(),err", err)
- if showlog > 0 {
- fmt.Println("tx.Commit(),err", err)
- }
- }
- }
- }
- }
- }()
- }
- func check(err error) {
- if err != nil {
- logger.Errorln("err :", err.Error())
- }
- }
- //func (m *Client) Pub() {
- // fmt.Println("pub")
- // s := time.Now().String()
- // token := m.Client.Publish("date", 0, false, s)
- // if token.Wait() && token.Error() != nil {
- // fmt.Println(token.Error())
- // }
- //}
- //新希望
- type Xxw struct {
- id int64
- xxw_sql string
- }
- func Mq_ConsumerXxw(ctx context.Context) {
- var err error
- DqueueIndexMqtt, err = OpenIndexFile(CurrentPath+"logs", 100000, 10000, false)
- if err != nil {
- fmt.Printf("create DqueueIndex err %v", err)
- os.Exit(1)
- }
- fmt.Println("Mqtt Clint 启动")
- connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves.
- connOpts.SetClientID(mqtt_nodevalue)
- connOpts.SetMaxReconnectInterval(1 * time.Second)
- connOpts.SetCleanSession(mqtt_CleanSession)
- connOpts.AutoReconnect = true
- brokerURL := fmt.Sprintf("tcp://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
- println("brokerURL", brokerURL)
- if mqtt_saslEnable {
- if mqtt_username != "" {
- connOpts.SetUsername(mqtt_username)
- if mqtt_saslpassword != "" {
- connOpts.SetPassword(mqtt_saslpassword)
- }
- }
- }
- if mqtt_tlsEnable {
- cer, err := tls.LoadX509KeyPair(mqtt_clientcert, mqtt_clientkey)
- check(err)
- connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
- brokerURL = fmt.Sprintf("tcps://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
- }
- connOpts.AddBroker(brokerURL)
- connOpts.OnConnect = func(c MQTT.Client) {
- if token := c.Subscribe(mqtt_topic, byte(mqtt_qos), onMessageReceived); token.Wait() && token.Error() != nil {
- fmt.Println("panic(token.Error())", token.Error())
- }
- }
- label:
- mqttClient := MQTT.NewClient(connOpts)
- if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
- if showlog > 0 {
- fmt.Println("token.Error(),err", token.Error())
- }
- logger.Errorln("token.Error(),err", token.Error())
- time.Sleep(time.Second)
- goto label
- }
- log.Println("[MQTT] Connected")
- go func() {
- for {
- select {
- case <-ctx.Done():
- fmt.Println("mqtt 退出")
- logger.Errorln("mqtt 退出")
- return
- default:
- PopPeeks, err, full, null := DqueueIndexMqtt.PopPeeksV2(1)
- if err != nil {
- logger.Errorln("PopPeeks,err ", err)
- } else {
- // fmt.Println(PopPeeks)
- if len(PopPeeks) == 0 {
- if !full && null {
- DqueueIndexMqtt.PopOneIndex()
- }
- if showlog > 0 {
- fmt.Println("PopPeeks,null,", PopPeeks)
- }
- time.Sleep(time.Second)
- continue
- }
- labelDb:
- if mqttDb == nil {
- mqttDb, err = GetmyDbsConnect(user, password, host, port, tableDB)
- if err != nil {
- logger.Errorln("GetmyDbsConnect err:", err)
- time.Sleep(time.Second)
- goto labelDb
- }
- }
- tx, err := mqttDb.Begin()
- if err == nil {
- count := 0
- str := ""
- errstr := ""
- sqls := strings.Split(string(PopPeeks[0]), ";")
- fmt.Println(sqls, time.Now())
- for _, sql := range sqls {
- // fmt.Println(sql)
- //err = ExecT(tx, string(value.Content))
- fmt.Println(sqls, time.Now(), strings.Index(sql, "kptCattleId"+KptCattleId))
- logger.Errorln(sqls, time.Now(), strings.Index(sql, "kptCattleId"+KptCattleId))
- if sql == "" || strings.Index(sql, "kptCattleId"+KptCattleId) == -1 {
- continue
- }
- sql = strings.Replace(sql, "kptCattleId"+KptCattleId, "", -1)
- sql = strings.Replace(sql, "\\", "\\\\", -1)
- err = ExecT(tx, sql)
- if err == nil {
- logger.Infoln(" -- ", sql)
- //fmt.Println( "ExecT---" + string(value.Content))
- } else {
- logger.Errorln(err.Error() + "---" + sql)
- count++
- errstr = err.Error()
- //fmt.Println(err.Error() + "---" + string(value.Content))
- }
- }
- if count > 0 {
- count = 0
- logger.Errorln(errstr + "---" + str)
- }
- } else {
- if showlog > 0 {
- log.Println("mqttDb.Begin(),err", err)
- }
- }
- err = tx.Commit()
- if err == nil {
- if !full && null {
- for i := 0; i < len(PopPeeks)+1; i++ {
- _, _ = DqueueIndexMqtt.PopOneIndex()
- if showlog > 0 {
- //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1", string(pop), err)
- }
- }
- } else {
- for i := 0; i < len(PopPeeks); i++ {
- _, _ = DqueueIndexMqtt.PopOneIndex()
- if showlog > 0 {
- //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()2", string(pop), err)
- }
- }
- }
- } else {
- logger.Errorln("tx.Commit(),err", err)
- if showlog > 0 {
- fmt.Println("tx.Commit(),err", err)
- }
- }
- }
- }
- }
- }()
- }
- func Mqtt_producerXxw(ctx context.Context) { //0824数据同步
- fmt.Println("Mqtt Clint 启动1")
- // var reconnectHandler mqtt.ReconnectHandler = func(client mqtt.Client, opts *mqtt.ClientOptions) {
- // fmt.Println("Reconnecting to MQTT broker...")
- // if token := client.Connect(); token.Wait() && token.Error() != nil {
- // log.Fatalf("Failed to reconnect: %v", token.Error())
- // }
- // }
- println(user, password, host, port, tableDB)
- retained := flag.Bool("retained", false, "Are the messages sent with the retained flag")
- connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves.
- connOpts.SetClientID(mqtt_nodevalue)
- connOpts.SetMaxReconnectInterval(1 * time.Second)
- connOpts.SetCleanSession(mqtt_CleanSession)
- connOpts.AutoReconnect = true
- // connOpts.OnReconnecting = reconnectHandler
- brokerURL := fmt.Sprintf("tcp://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
- if mqtt_saslEnable {
- if mqtt_username != "" {
- connOpts.SetUsername(mqtt_username)
- if mqtt_saslpassword != "" {
- connOpts.SetPassword(mqtt_saslpassword)
- }
- }
- }
- logger.Info("开始")
- if mqtt_tlsEnable {
- cer, err := tls.LoadX509KeyPair(mqtt_clientcert, mqtt_clientkey)
- check(err)
- connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
- brokerURL = fmt.Sprintf("tcps://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
- }
- connOpts.OnConnect = func(c MQTT.Client) {
- }
- connOpts.AddBroker(brokerURL)
- mqttClient := MQTT.NewClient(connOpts)
- if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- logger.Errorln("[MQTT] Connected")
- for {
- select {
- case <-ctx.Done():
- fmt.Println("mqtt 退出")
- return
- default:
- var err error
- labelDb:
- if mqttDb == nil {
- mqttDb, err = GetmyDbsConnect(user, password, host, port, tableDB)
- if err != nil {
- logger.Errorln("GetmyDbsConnect err:", err)
- time.Sleep(time.Second)
- goto labelDb
- }
- }
- //fmt.Println("mqttDb.Begin()")
- tx, err := mqttDb.Begin()
- xxw := Xxw{}
- row := tx.QueryRow("select id,xxw_sql from xxw where isused=0 order by id limit 1")
- if err != nil {
- logger.Info("Query err", err)
- }
- err = row.Scan(&xxw.id, &xxw.xxw_sql)
- if xxw.id != 0 && xxw.xxw_sql != "" {
- token := mqttClient.Publish(mqtt_topic, byte(mqtt_qos), *retained, xxw.xxw_sql)
- if token.Wait() && token.Error() != nil {
- logger.Info("Client.Publish err:--- ", err)
- } else {
- logger.Infoln("Client.Publish success")
- _, err = tx.Exec("update xxw set isused=1 where id=?", xxw.id)
- if err != nil {
- logger.Info("Exec err", err)
- }
- }
- } else {
- time.Sleep(time.Second)
- }
- err = tx.Commit()
- if err != nil {
- tx.Rollback()
- logger.Info("Commit err", err)
- }
- }
- }
- }
|