package comm import ( "crypto/tls" "encoding/hex" "fmt" MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/kptyun/KPTCOMM/pkg/setting" "time" ) var ( mqtt_Breaki int temp_i int = 0 StopCharMqtt chan int mqttClient MQTT.Client ) func onMessageReceived(client MQTT.Client, message MQTT.Message) { fmt.Println("Topic_up:", message.Payload()) tmpstr := make([]string, 2) tmpstr[0] = ProcessBuf(message.Payload()) if len(tmpstr) > 0 { _ = sendBuf(tmpstr, client) } else { client.Publish(setting.MqttSetting.Topic_Down, 1, false, message.Payload()) } } func MQPublish(client MQTT.Client, payload string) { if len(payload) == 0 { return } buftmp, err := getOutbytes(payload, "") if err == nil { token := client.Publish(setting.MqttSetting.Topic_Down, 1, false, buftmp) if token.Error() == nil { Logger.Infoln("MQTT_output:", hex.EncodeToString(buftmp)) if setting.DatabaseSetting.Showlog { fmt.Println("MQTT_output:", hex.EncodeToString(buftmp)) } } else { Logger.Errorln("MQTT_发布错误: ", token.Error()) } } } func Mq_Consumer() { defer func() { // 发生宕机时,获取panic传递的上下文并打印 if err := recover(); err != nil { fmt.Println("Mq_Consumer出错:", err) } }() connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves. connOpts.SetClientID(setting.MqttSetting.Nodevalue) connOpts.SetMaxReconnectInterval(1 * time.Second) connOpts.SetCleanSession(setting.MqttSetting.CleanSession) connOpts.AutoReconnect = true brokerURL := fmt.Sprintf("tcp://%s:%d%s", setting.MqttSetting.Host, setting.MqttSetting.Port, setting.MqttSetting.Path) if setting.MqttSetting.SaslEnable { if setting.MqttSetting.Username != "" { connOpts.SetUsername(setting.MqttSetting.Username) if setting.MqttSetting.Saslpassword != "" { connOpts.SetPassword(setting.MqttSetting.Saslpassword) } } } if setting.MqttSetting.TlsEnable { cer, err := tls.LoadX509KeyPair(setting.MqttSetting.Clientcert, setting.MqttSetting.Clientkey) check(err) connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}}) brokerURL = fmt.Sprintf("tcps://%s:%d%s", setting.MqttSetting.Host, setting.MqttSetting.Port, setting.MqttSetting.Path) } connOpts.AddBroker(brokerURL) connOpts.OnConnect = func(c MQTT.Client) { if token := c.Subscribe(setting.MqttSetting.Topic_Up, byte(setting.MqttSetting.Qos), onMessageReceived); token.Wait() && token.Error() != nil { Logger.Errorln(token.Error()) } } label: mqttClient = MQTT.NewClient(connOpts) if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { time.Sleep(time.Second) goto label } if setting.DatabaseSetting.Showlog { fmt.Println("[MQTT] Connected") } Logger.Infoln("[MQTT] Connected") for { select { case <-StopCharMqtt: fmt.Println("mqtt 退出") Logger.Errorln("mqtt 退出") break case <-time.After(10 * time.Microsecond): continue } } } func check(err error) { if err != nil { Logger.Errorln(err) } }