mqtt.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package comm
  2. import (
  3. "crypto/tls"
  4. "encoding/hex"
  5. "fmt"
  6. MQTT "github.com/eclipse/paho.mqtt.golang"
  7. "github.com/kptyun/KPTCOMM/pkg/setting"
  8. "time"
  9. )
  10. var (
  11. mqtt_Breaki int
  12. temp_i int = 0
  13. StopCharMqtt chan int
  14. mqttClient MQTT.Client
  15. )
  16. func onMessageReceived(client MQTT.Client, message MQTT.Message) {
  17. fmt.Println("Topic_up:", message.Payload())
  18. tmpstr := make([]string, 2)
  19. tmpstr[0] = ProcessBuf(message.Payload())
  20. if len(tmpstr) > 0 {
  21. _ = sendBuf(tmpstr, client)
  22. } else {
  23. client.Publish(setting.MqttSetting.Topic_Down, 1, false, message.Payload())
  24. }
  25. }
  26. func MQPublish(client MQTT.Client, payload string) {
  27. if len(payload) == 0 {
  28. return
  29. }
  30. buftmp, err := getOutbytes(payload, "")
  31. if err == nil {
  32. token := client.Publish(setting.MqttSetting.Topic_Down, 1, false, buftmp)
  33. if token.Error() == nil {
  34. Logger.Infoln("MQTT_output:", hex.EncodeToString(buftmp))
  35. if setting.DatabaseSetting.Showlog {
  36. fmt.Println("MQTT_output:", hex.EncodeToString(buftmp))
  37. }
  38. } else {
  39. Logger.Errorln("MQTT_发布错误: ", token.Error())
  40. }
  41. }
  42. }
  43. func Mq_Consumer() {
  44. defer func() {
  45. // 发生宕机时,获取panic传递的上下文并打印
  46. if err := recover(); err != nil {
  47. fmt.Println("Mq_Consumer出错:", err)
  48. }
  49. }()
  50. connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves.
  51. connOpts.SetClientID(setting.MqttSetting.Nodevalue)
  52. connOpts.SetMaxReconnectInterval(1 * time.Second)
  53. connOpts.SetCleanSession(setting.MqttSetting.CleanSession)
  54. connOpts.AutoReconnect = true
  55. brokerURL := fmt.Sprintf("tcp://%s:%d%s", setting.MqttSetting.Host, setting.MqttSetting.Port, setting.MqttSetting.Path)
  56. if setting.MqttSetting.SaslEnable {
  57. if setting.MqttSetting.Username != "" {
  58. connOpts.SetUsername(setting.MqttSetting.Username)
  59. if setting.MqttSetting.Saslpassword != "" {
  60. connOpts.SetPassword(setting.MqttSetting.Saslpassword)
  61. }
  62. }
  63. }
  64. if setting.MqttSetting.TlsEnable {
  65. cer, err := tls.LoadX509KeyPair(setting.MqttSetting.Clientcert, setting.MqttSetting.Clientkey)
  66. check(err)
  67. connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  68. brokerURL = fmt.Sprintf("tcps://%s:%d%s", setting.MqttSetting.Host, setting.MqttSetting.Port, setting.MqttSetting.Path)
  69. }
  70. connOpts.AddBroker(brokerURL)
  71. connOpts.OnConnect = func(c MQTT.Client) {
  72. if token := c.Subscribe(setting.MqttSetting.Topic_Up, byte(setting.MqttSetting.Qos), onMessageReceived); token.Wait() && token.Error() != nil {
  73. Logger.Errorln(token.Error())
  74. }
  75. }
  76. label:
  77. mqttClient = MQTT.NewClient(connOpts)
  78. if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
  79. time.Sleep(time.Second)
  80. goto label
  81. }
  82. if setting.DatabaseSetting.Showlog {
  83. fmt.Println("[MQTT] Connected")
  84. }
  85. Logger.Infoln("[MQTT] Connected")
  86. for {
  87. select {
  88. case <-StopCharMqtt:
  89. fmt.Println("mqtt 退出")
  90. Logger.Errorln("mqtt 退出")
  91. break
  92. case <-time.After(10 * time.Microsecond):
  93. continue
  94. }
  95. }
  96. }
  97. func check(err error) {
  98. if err != nil {
  99. Logger.Errorln(err)
  100. }
  101. }