package kpt import ( "context" "crypto/tls" "flag" "fmt" MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/golang/snappy" "github.com/vmihailenco/msgpack/v5" "log" "os" "strconv" "strings" "time" ) var ( mqtt_host string mqtt_port int mqtt_path string mqtt_topic string mqtt_qos int mqtt_saslEnable bool mqtt_username string mqtt_saslpassword string mqtt_tlsEnable bool mqtt_clientcert string mqtt_clientkey string mqtt_cacert string mqtt_nodevalue string mqtt_CleanSession bool mqtt_Breaki int mqtt_onetimerows string temp_i int = 0 ) type SqlItem struct { Sql string } type TransferItem struct { Ftype string Content []byte } func SqlItemBuilder() interface{} { return &SqlItem{} } func onMessageReceived(client MQTT.Client, message MQTT.Message) { err := DqueueIndexMqtt.PushOneIndex(message.Payload()[:]) if err != nil { logger.Errorln("DqueueIndexMqtt.PushOneIndex,err", err) } } func Mqtt_producerDB(ctx context.Context) { fmt.Println("Mqtt Clint 启动") retained := flag.Bool("retained", false, "Are the messages sent with the retained flag") connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves. connOpts.SetClientID(mqtt_nodevalue) connOpts.SetMaxReconnectInterval(1 * time.Second) connOpts.SetCleanSession(mqtt_CleanSession) connOpts.AutoReconnect = true brokerURL := fmt.Sprintf("tcp://%s:%d%s", mqtt_host, mqtt_port, mqtt_path) int_, err := strconv.Atoi(onetimerows) if err == nil { } else { logger.Errorln("转换onetimerowsMqtt错误 \r\n", err.Error()) } if mqtt_saslEnable { if mqtt_username != "" { connOpts.SetUsername(mqtt_username) if mqtt_saslpassword != "" { connOpts.SetPassword(mqtt_saslpassword) } } } if mqtt_tlsEnable { cer, err := tls.LoadX509KeyPair(mqtt_clientcert, mqtt_clientkey) if err != nil { println("LoadX509KeyPair err :",err.Error()) } connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}}) brokerURL = fmt.Sprintf("tcps://%s:%d%s", mqtt_host, mqtt_port, mqtt_path) } connOpts.OnConnect = func(c MQTT.Client) { //if token := c.Subscribe(mqtt_topic, byte(mqtt_qos), onMessageReceived); token.Wait() && token.Error() != nil { // panic(token.Error()) //} } connOpts.OnConnectionLost = func (c MQTT.Client, reason error) { println("onlost reason",reason.Error()) } connOpts.AddBroker(brokerURL) //connOpts.OnConnect = func(c MQTT.Client) { // if token := c.Subscribe(mqtt_topic, byte(mqtt_qos), onMessageReceived); token.Wait() && token.Error() != nil { // //panic(token.Error()) // fmt.Println("panic(token.Error())",token.Error()) // }else{ // //fmt.Println("",token,token.Error(),mqtt_topic) // } // //} mqttClient := MQTT.NewClient(connOpts) if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } logger.Errorln("[MQTT] Connected",) for { select { case <-ctx.Done(): fmt.Println("mqtt 退出") return default: PopPeeks, err, full, null := DqueueIndex.PopPeeksV2(int_) if err != nil { fmt.Println("PopPeeks,err ", err) } else { if len(PopPeeks) == 0 { if !full && null { DqueueIndex.PopOneIndex() } time.Sleep(time.Second) continue } //fmt.Println("PopPeeks==",string(PopPeeks[0])) datasSlice := make([]TransferItem, len(PopPeeks)) for key, value := range PopPeeks { //println("resql =========",string(value)) datasSlice[key].Content = value } b1, err := msgpack.Marshal(&datasSlice) if err != nil { panic(err) } //fmt.Println("encode", len(b1)) encode := snappy.Encode(nil, b1) logger.Infoln("encode", len(encode)) //println("encode",len(encode)) token := mqttClient.Publish(mqtt_topic, byte(mqtt_qos), *retained, encode) //if token.Wait() && token.Error() != nil { // fmt.Println("=============",token.Error()) //} if token.Wait() && token.Error() != nil { if showlog > 0 { fmt.Println(" mqttClient.Publish err:", mqtt_topic, string(PopPeeks[0]), err) } logger.Errorln("Client.Publish err:--- ", token.Error()) } else { temp_i++ //fmt.Println(temp_i) logger.Infoln("Client.Publish len:--- ", len(encode)) if !full && null { for i := 0; i < len(PopPeeks)+1; i++ { _, _ = DqueueIndex.PopOneIndex() if showlog > 0 { //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1", string(pop), err) } } } else { for i := 0; i < len(PopPeeks); i++ { _, _ = DqueueIndex.PopOneIndex() if showlog > 0 { //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1", string(pop), err) } } } } } } } } func Mq_Consumer(ctx context.Context) { var err error DqueueIndexMqtt, err = OpenIndexFile(CurrentPath+"logs", 100000, 10000, false) if err != nil { fmt.Printf("create DqueueIndex err %v", err) os.Exit(1) } fmt.Println("Mqtt Clint 启动") connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves. connOpts.SetClientID(mqtt_nodevalue) connOpts.SetMaxReconnectInterval(1 * time.Second) connOpts.SetCleanSession(mqtt_CleanSession) connOpts.AutoReconnect = true brokerURL := fmt.Sprintf("tcp://%s:%d%s", mqtt_host, mqtt_port, mqtt_path) if mqtt_saslEnable { if mqtt_username != "" { connOpts.SetUsername(mqtt_username) if mqtt_saslpassword != "" { connOpts.SetPassword(mqtt_saslpassword) } } } if mqtt_tlsEnable { cer, err := tls.LoadX509KeyPair(mqtt_clientcert, mqtt_clientkey) check(err) connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}}) brokerURL = fmt.Sprintf("tcps://%s:%d%s", mqtt_host, mqtt_port, mqtt_path) } connOpts.AddBroker(brokerURL) connOpts.OnConnect = func(c MQTT.Client) { if token := c.Subscribe(mqtt_topic, byte(mqtt_qos), onMessageReceived); token.Wait() && token.Error() != nil { fmt.Println("panic(token.Error())", token.Error()) } } label: mqttClient := MQTT.NewClient(connOpts) if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { if showlog > 0 { fmt.Println("token.Error(),err", token.Error()) } logger.Errorln("token.Error(),err", token.Error()) time.Sleep(time.Second) goto label //panic(token.Error()) } log.Println("[MQTT] Connected") go func() { for { select { case <-ctx.Done(): fmt.Println("mqtt 退出") logger.Errorln("mqtt 退出") return default: PopPeeks, err, full, null := DqueueIndexMqtt.PopPeeksV2(1) //fmt.Println("DqueueIndexMqtt.PopPeeksV2(1)",len(PopPeeks)) if err != nil { logger.Errorln("PopPeeks,err ", err) } else { if len(PopPeeks) == 0 { if !full && null { DqueueIndexMqtt.PopOneIndex() } if showlog > 0 { fmt.Println("PopPeeks,null,", PopPeeks) } time.Sleep(time.Second) continue } // var pop []byte labelDb: if mqttDb == nil { mqttDb, err = GetmyDbsConnect(user, password, host, port, tableDB) if err != nil { logger.Errorln("GetmyDbsConnect err:", err) time.Sleep(time.Second) goto labelDb } } //fmt.Println("mqttDb.Begin()") tx, err := mqttDb.Begin() if err == nil { //fmt.Println("tx, err := mqttDb.Begin() err", err) decode, _ := snappy.Decode(nil,PopPeeks[0]) //fmt.Println("encode", len(decode)) var item1 []TransferItem err = msgpack.Unmarshal(decode, &item1) if err != nil { log.Println("Unmarshal panic(err)",err) } count :=0 str := "" errstr := "" for _, value := range item1 { sqls :=strings.Split(string(value.Content),";") for _, sql := range sqls { //err = ExecT(tx, string(value.Content)) if sql == ""{ continue } sql = strings.Replace(sql,"\\","\\\\",-1) err = ExecT(tx, sql) //fmt.Println("string(value.Content)",string(value.Content)) if err == nil { //logger.Infoln(fmt.Sprintf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), sqlline)) //fmt.Println( "ExecT---" + string(value.Content)) } else { logger.Errorln(err.Error() + "---" + sql) count ++ str =string(value.Content) errstr = err.Error() //fmt.Println(err.Error() + "---" + string(value.Content)) } } } if count>0{ count=0 logger.Errorln(errstr + "---" + str) } if showlog > 0 { log.Println("PopPeeks, %d \n\t", len(item1)) } }else { if showlog > 0 { log.Println("mqttDb.Begin(),err", err) } } err = tx.Commit() if err == nil { if !full && null { for i := 0; i < len(PopPeeks)+1; i++ { _, _ = DqueueIndexMqtt.PopOneIndex() if showlog > 0 { //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1", string(pop), err) } } } else { for i := 0; i < len(PopPeeks); i++ { _, _ = DqueueIndexMqtt.PopOneIndex() if showlog > 0 { //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()2", string(pop), err) } } } } else { logger.Errorln("tx.Commit(),err", err) if showlog > 0 { fmt.Println("tx.Commit(),err", err) } } } } } }() } func check(err error) { if err != nil { logger.Errorln("err :", err.Error()) } } //func (m *Client) Pub() { // fmt.Println("pub") // s := time.Now().String() // token := m.Client.Publish("date", 0, false, s) // if token.Wait() && token.Error() != nil { // fmt.Println(token.Error()) // } //} //新希望 type Xxw struct { id int64 xxw_sql string } func Mq_ConsumerXxw(ctx context.Context) { var err error DqueueIndexMqtt, err = OpenIndexFile(CurrentPath+"logs", 100000, 10000, false) if err != nil { fmt.Printf("create DqueueIndex err %v", err) os.Exit(1) } fmt.Println("Mqtt Clint 启动") connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves. connOpts.SetClientID(mqtt_nodevalue) connOpts.SetMaxReconnectInterval(1 * time.Second) connOpts.SetCleanSession(mqtt_CleanSession) connOpts.AutoReconnect = true brokerURL := fmt.Sprintf("tcp://%s:%d%s", mqtt_host, mqtt_port, mqtt_path) println("brokerURL",brokerURL) if mqtt_saslEnable { if mqtt_username != "" { connOpts.SetUsername(mqtt_username) if mqtt_saslpassword != "" { connOpts.SetPassword(mqtt_saslpassword) } } } if mqtt_tlsEnable { cer, err := tls.LoadX509KeyPair(mqtt_clientcert, mqtt_clientkey) check(err) connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}}) brokerURL = fmt.Sprintf("tcps://%s:%d%s", mqtt_host, mqtt_port, mqtt_path) } connOpts.AddBroker(brokerURL) connOpts.OnConnect = func(c MQTT.Client) { if token := c.Subscribe(mqtt_topic, byte(mqtt_qos), onMessageReceived); token.Wait() && token.Error() != nil { fmt.Println("panic(token.Error())", token.Error()) } } label: mqttClient := MQTT.NewClient(connOpts) if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { if showlog > 0 { fmt.Println("token.Error(),err", token.Error()) } logger.Errorln("token.Error(),err", token.Error()) time.Sleep(time.Second) goto label } log.Println("[MQTT] Connected") go func() { for { select { case <-ctx.Done(): fmt.Println("mqtt 退出") logger.Errorln("mqtt 退出") return default: PopPeeks, err, full, null := DqueueIndexMqtt.PopPeeksV2(1) if err != nil { logger.Errorln("PopPeeks,err ", err) } else { if len(PopPeeks) == 0 { if !full && null { DqueueIndexMqtt.PopOneIndex() } if showlog > 0 { fmt.Println("PopPeeks,null,", PopPeeks) } time.Sleep(time.Second) continue } labelDb: if mqttDb == nil { mqttDb, err = GetmyDbsConnect(user, password, host, port, tableDB) if err != nil { logger.Errorln("GetmyDbsConnect err:", err) time.Sleep(time.Second) goto labelDb } } tx, err := mqttDb.Begin() if err == nil { count :=0 str := "" errstr := "" sqls :=strings.Split(string(PopPeeks[0]),";") for _, sql := range sqls { //err = ExecT(tx, string(value.Content)) if sql == "" || strings.Index(sql, "kptCattleId"+KptCattleId) == -1{ continue } sql = strings.Replace(sql,"kptCattleId"+KptCattleId,"",-1) sql = strings.Replace(sql,"\\","\\\\",-1) err = ExecT(tx, sql) if err == nil { logger.Infoln(" -- ",sql) //fmt.Println( "ExecT---" + string(value.Content)) } else { logger.Errorln(err.Error() + "---" + sql) count ++ errstr = err.Error() //fmt.Println(err.Error() + "---" + string(value.Content)) } } if count>0{ count=0 logger.Errorln(errstr + "---" + str) } }else { if showlog > 0 { log.Println("mqttDb.Begin(),err", err) } } err = tx.Commit() if err == nil { if !full && null { for i := 0; i < len(PopPeeks)+1; i++ { _, _ = DqueueIndexMqtt.PopOneIndex() if showlog > 0 { //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1", string(pop), err) } } } else { for i := 0; i < len(PopPeeks); i++ { _, _ = DqueueIndexMqtt.PopOneIndex() if showlog > 0 { //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()2", string(pop), err) } } } } else { logger.Errorln("tx.Commit(),err", err) if showlog > 0 { fmt.Println("tx.Commit(),err", err) } } } } } }() } func Mqtt_producerXxw(ctx context.Context) { //0824数据同步 fmt.Println("Mqtt Clint 启动1") println(user, password, host, port, tableDB) retained := flag.Bool("retained", false, "Are the messages sent with the retained flag") connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves. connOpts.SetClientID(mqtt_nodevalue) connOpts.SetMaxReconnectInterval(1 * time.Second) connOpts.SetCleanSession(mqtt_CleanSession) connOpts.AutoReconnect = true brokerURL := fmt.Sprintf("tcp://%s:%d%s", mqtt_host, mqtt_port, mqtt_path) if mqtt_saslEnable { if mqtt_username != "" { connOpts.SetUsername(mqtt_username) if mqtt_saslpassword != "" { connOpts.SetPassword(mqtt_saslpassword) } } } logger.Info("开始") if mqtt_tlsEnable { cer, err := tls.LoadX509KeyPair(mqtt_clientcert, mqtt_clientkey) check(err) connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}}) brokerURL = fmt.Sprintf("tcps://%s:%d%s", mqtt_host, mqtt_port, mqtt_path) } connOpts.OnConnect = func(c MQTT.Client) { } connOpts.AddBroker(brokerURL) mqttClient := MQTT.NewClient(connOpts) if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } logger.Errorln("[MQTT] Connected") for { select { case <-ctx.Done(): fmt.Println("mqtt 退出") return default: var err error labelDb: if mqttDb == nil { mqttDb, err = GetmyDbsConnect(user, password, host, port, tableDB) if err != nil { logger.Errorln("GetmyDbsConnect err:", err) time.Sleep(time.Second) goto labelDb } } //fmt.Println("mqttDb.Begin()") tx, err := mqttDb.Begin() xxw := Xxw{} row :=tx.QueryRow("select id,xxw_sql from xxw where isused=0 order by id limit 1") if err != nil{ logger.Info("Query err",err) } err =row.Scan(&xxw.id,&xxw.xxw_sql) if xxw.id !=0 && xxw.xxw_sql !="" { token := mqttClient.Publish(mqtt_topic, byte(mqtt_qos), *retained, xxw.xxw_sql) if token.Wait() && token.Error() != nil { logger.Info("Client.Publish err:--- ", err) } else { logger.Infoln("Client.Publish success") _,err=tx.Exec("update xxw set isused=1 where id=?",xxw.id) if err != nil{ logger.Info("Exec err",err) } } }else { time.Sleep(time.Second) } err=tx.Commit() if err != nil{ tx.Rollback() logger.Info("Commit err",err) } } } }