interface.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package mqtt2
  2. import (
  3. "fmt"
  4. "kpt-pasture/config"
  5. "kpt-pasture/util"
  6. "time"
  7. "gitee.com/xuyiping_admin/pkg/di"
  8. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  9. golangMqtt "github.com/eclipse/paho.mqtt.golang"
  10. mqtt "github.com/eclipse/paho.mqtt.golang"
  11. "go.uber.org/zap"
  12. )
  13. var Module = di.Options(di.Provide(NewServer))
  14. type MqttClient struct {
  15. golangMqtt.Client
  16. Config config.MqttSetting
  17. }
  18. type MqttServer interface {
  19. Consumer(func([]byte))
  20. Producer(top string, qos int32, data []byte) error
  21. Close()
  22. }
  23. var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
  24. zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
  25. }
  26. var connectHandler = func(client golangMqtt.Client, mqttClient *MqttClient) {
  27. fmt.Println("====connectHandler=======")
  28. buffer := bufferPool.Get().([]byte)
  29. if token := client.Subscribe(mqttClient.Config.SubTopic, byte(mqttClient.Config.Qos), func(client mqtt.Client, msg mqtt.Message) {
  30. copy(buffer, msg.Payload())
  31. select {
  32. case readMsgChan <- buffer[:len(msg.Payload())]:
  33. fmt.Println("====buffer=======", string(buffer))
  34. bufferPool.Put(buffer)
  35. }
  36. }); token.Wait() && token.Error() != nil {
  37. zaplog.Error("Consumer", zap.Any("topic", mqttClient.Config.SubTopic), zap.Any("err", token.Error()))
  38. }
  39. }
  40. var connectLostHandler = func(client golangMqtt.Client, err error, mqttClient *MqttClient) {
  41. zaplog.Info("connectLost", zap.Any("err", err.Error()))
  42. for {
  43. token := client.Connect()
  44. if token.Wait() && token.Error() == nil {
  45. // 成功重连,更新全局客户端实例
  46. connectHandler(client, mqttClient)
  47. return
  48. }
  49. zaplog.Error("ConnectionRetry", zap.Any("err", token.Error()))
  50. time.Sleep(5 * time.Second)
  51. }
  52. }
  53. func NewServer(config *config.AppConfig) MqttServer {
  54. conf := config.Mqtt
  55. opts := golangMqtt.NewClientOptions()
  56. opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
  57. opts.SetClientID(util.GenerateRandomNumberString(16))
  58. opts.SetUsername(conf.UserName)
  59. opts.SetPassword(conf.Password)
  60. opts.SetCleanSession(false)
  61. opts.SetConnectRetry(true)
  62. opts.SetConnectRetryInterval(5 * time.Minute)
  63. opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive))
  64. opts.SetAutoReconnect(conf.AutoReconnect)
  65. opts.SetDefaultPublishHandler(messagePubHandler)
  66. client := golangMqtt.NewClient(opts)
  67. if token := client.Connect(); token.Wait() && token.Error() != nil {
  68. panic(token.Error())
  69. }
  70. mqttClient := &MqttClient{Client: client, Config: conf}
  71. opts.OnConnect = func(client mqtt.Client) {
  72. connectHandler(client, mqttClient)
  73. }
  74. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  75. connectLostHandler(client, err, mqttClient)
  76. }
  77. return mqttClient
  78. }
  79. func (s *MqttClient) Close() {
  80. if s.Client.IsConnected() {
  81. s.Client.Disconnect(250)
  82. }
  83. }