interface.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package mqtt
  2. import (
  3. "fmt"
  4. "kpt-pasture/config"
  5. mqtt2 "kpt-pasture/module/mqtt"
  6. "kpt-pasture/util"
  7. "os"
  8. "os/signal"
  9. "syscall"
  10. "time"
  11. "go.uber.org/dig"
  12. "gitee.com/xuyiping_admin/pkg/di"
  13. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  14. golangMqtt "github.com/eclipse/paho.mqtt.golang"
  15. mqtt "github.com/eclipse/paho.mqtt.golang"
  16. "go.uber.org/zap"
  17. )
  18. var Module = di.Options(di.Provide(NewServer))
  19. type IMqttClient struct {
  20. dig.In
  21. golangMqtt.Client
  22. Config config.MqttSetting
  23. MqttHub mqtt2.Entry // 处理数据
  24. }
  25. type IMqttServer interface {
  26. Consumer(func([]byte))
  27. Producer(top string, qos int32, data []byte) error
  28. Run()
  29. Close()
  30. }
  31. var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
  32. zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
  33. }
  34. var connectHandler = func(client golangMqtt.Client) {
  35. zaplog.Info("connectHandler", zap.Any("client", client))
  36. }
  37. var connectLostHandler = func(client golangMqtt.Client, err error) {
  38. zaplog.Error("connectLost-err", zap.Any("err", err.Error()))
  39. for {
  40. token := client.Connect()
  41. if token.Wait() && token.Error() == nil {
  42. // 成功重连,更新全局客户端实例
  43. connectHandler(client)
  44. break
  45. }
  46. zaplog.Error("ConnectionRetry", zap.Any("err", token.Error()))
  47. time.Sleep(5 * time.Second)
  48. }
  49. zaplog.Error("connectLost-success")
  50. }
  51. func NewServer(config *config.AppConfig) IMqttServer {
  52. conf := config.Mqtt
  53. opts := golangMqtt.NewClientOptions()
  54. opts.AddBroker(fmt.Sprintf("tcp://%s", conf.Broker))
  55. opts.SetClientID(util.GenerateRandomNumberString(16))
  56. opts.SetUsername(conf.UserName)
  57. opts.SetPassword(conf.Password)
  58. opts.SetCleanSession(false)
  59. opts.SetConnectRetry(true)
  60. opts.SetConnectRetryInterval(5 * time.Minute)
  61. opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive))
  62. opts.SetAutoReconnect(conf.AutoReconnect)
  63. opts.SetDefaultPublishHandler(messagePubHandler)
  64. opts.OnConnect = connectHandler
  65. opts.OnConnectionLost = connectLostHandler
  66. client := golangMqtt.NewClient(opts)
  67. if token := client.Connect(); token.Wait() && token.Error() != nil {
  68. panic(token.Error())
  69. }
  70. return &IMqttClient{Client: client, Config: conf}
  71. }
  72. func (s *IMqttClient) Close() {
  73. if s.Client.IsConnected() {
  74. s.Client.Disconnect(250)
  75. }
  76. }
  77. func (s *IMqttClient) Run() {
  78. // 设置信号监听以优雅关闭服务器
  79. stop := make(chan os.Signal, 1)
  80. signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  81. go func() {
  82. for {
  83. select {
  84. // 等待停止信号
  85. case <-stop:
  86. s.Close()
  87. return
  88. }
  89. }
  90. }()
  91. // 订阅主题
  92. buffer := bufferPool.Get().([]byte)
  93. if token := s.Client.Subscribe(s.Config.SubTopic, byte(s.Config.Qos), func(client mqtt.Client, msg mqtt.Message) {
  94. copy(buffer, msg.Payload())
  95. select {
  96. case readMsgChan <- buffer[:len(msg.Payload())]:
  97. bufferPool.Put(buffer)
  98. }
  99. }); token.Wait() && token.Error() != nil {
  100. zaplog.Error("Consumer", zap.Any("topic", s.Config.SubTopic), zap.Any("err", token.Error()))
  101. }
  102. // 启动数据处理
  103. s.Consumer(s.MqttHub.NeckRingHandle)
  104. }