123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- package comm
- import (
- "crypto/tls"
- "encoding/hex"
- "fmt"
- MQTT "github.com/eclipse/paho.mqtt.golang"
- "github.com/kptyun/KPTCOMM/pkg/setting"
- "time"
- )
- var (
- mqtt_Breaki int
- temp_i int = 0
- StopCharMqtt chan int
- mqttClient MQTT.Client
- )
- func onMessageReceived(client MQTT.Client, message MQTT.Message) {
- fmt.Println("Topic_up:", message.Payload())
- tmpstr := make([]string, 2)
- tmpstr[0] = ProcessBuf(message.Payload())
- if len(tmpstr) > 0 {
- _ = sendBuf(tmpstr, client)
- } else {
- client.Publish(setting.MqttSetting.Topic_Down, 1, false, message.Payload())
- }
- }
- func MQPublish(client MQTT.Client, payload string) {
- if len(payload) == 0 {
- return
- }
- buftmp, err := getOutbytes(payload, "")
- if err == nil {
- token := client.Publish(setting.MqttSetting.Topic_Down, 1, false, buftmp)
- if token.Error() == nil {
- Logger.Infoln("MQTT_output:", hex.EncodeToString(buftmp))
- if setting.DatabaseSetting.Showlog {
- fmt.Println("MQTT_output:", hex.EncodeToString(buftmp))
- }
- } else {
- Logger.Errorln("MQTT_发布错误: ", token.Error())
- }
- }
- }
- func Mq_Consumer() {
- defer func() {
- // 发生宕机时,获取panic传递的上下文并打印
- if err := recover(); err != nil {
- fmt.Println("Mq_Consumer出错:", err)
- }
- }()
- connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves.
- connOpts.SetClientID(setting.MqttSetting.Nodevalue)
- connOpts.SetMaxReconnectInterval(1 * time.Second)
- connOpts.SetCleanSession(setting.MqttSetting.CleanSession)
- connOpts.AutoReconnect = true
- brokerURL := fmt.Sprintf("tcp://%s:%d%s", setting.MqttSetting.Host, setting.MqttSetting.Port, setting.MqttSetting.Path)
- if setting.MqttSetting.SaslEnable {
- if setting.MqttSetting.Username != "" {
- connOpts.SetUsername(setting.MqttSetting.Username)
- if setting.MqttSetting.Saslpassword != "" {
- connOpts.SetPassword(setting.MqttSetting.Saslpassword)
- }
- }
- }
- if setting.MqttSetting.TlsEnable {
- cer, err := tls.LoadX509KeyPair(setting.MqttSetting.Clientcert, setting.MqttSetting.Clientkey)
- check(err)
- connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
- brokerURL = fmt.Sprintf("tcps://%s:%d%s", setting.MqttSetting.Host, setting.MqttSetting.Port, setting.MqttSetting.Path)
- }
- connOpts.AddBroker(brokerURL)
- connOpts.OnConnect = func(c MQTT.Client) {
- if token := c.Subscribe(setting.MqttSetting.Topic_Up, byte(setting.MqttSetting.Qos), onMessageReceived); token.Wait() && token.Error() != nil {
- Logger.Errorln(token.Error())
- }
- }
- label:
- mqttClient = MQTT.NewClient(connOpts)
- if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
- time.Sleep(time.Second)
- goto label
- }
- if setting.DatabaseSetting.Showlog {
- fmt.Println("[MQTT] Connected")
- }
- Logger.Infoln("[MQTT] Connected")
- for {
- select {
- case <-StopCharMqtt:
- fmt.Println("mqtt 退出")
- Logger.Errorln("mqtt 退出")
- break
- case <-time.After(10 * time.Microsecond):
- continue
- }
- }
- }
- func check(err error) {
- if err != nil {
- Logger.Errorln(err)
- }
- }
|