interface.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package mqtt
  2. import (
  3. "context"
  4. "fmt"
  5. "kpt-pasture/config"
  6. "kpt-pasture/util"
  7. "os"
  8. "os/signal"
  9. "syscall"
  10. "time"
  11. handleMqtt "kpt-pasture/module/mqtt"
  12. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  13. golangMqtt "github.com/eclipse/paho.mqtt.golang"
  14. mqtt "github.com/eclipse/paho.mqtt.golang"
  15. "go.uber.org/zap"
  16. )
  17. //var Module = di.Options(di.Provide(NewServer))
  18. type IMqttClient struct {
  19. golangMqtt.Client
  20. Config config.MqttSetting
  21. }
  22. type IMqttServer interface {
  23. Consumer(func([]byte))
  24. Producer(top string, qos int32, data []byte) error
  25. Run(handleMqtt.Entry)
  26. Close()
  27. }
  28. var (
  29. maxRetryAttempts = 500 // 最大重试次数
  30. initialRetryWait = 5 * time.Minute // 初始重试间隔
  31. maxRetryWait = 1 * time.Hour // 最大重试间隔
  32. )
  33. var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
  34. zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
  35. }
  36. var connectHandler = func(client golangMqtt.Client) {
  37. zaplog.Info("connectHandler", zap.Any("client", client))
  38. }
  39. var connectLostHandler = func(client golangMqtt.Client, err error) {
  40. zaplog.Error("connectLost-err", zap.Any("err", err.Error()))
  41. retryConnect(client, 0)
  42. }
  43. // 重试连接函数
  44. func retryConnect(client golangMqtt.Client, attempt int) {
  45. if attempt >= maxRetryAttempts {
  46. zaplog.Error("Max retry attempts reached, giving up")
  47. return
  48. }
  49. // 计算重试间隔(指数退避)
  50. retryWait := initialRetryWait * time.Duration(1<<attempt) // 2^attempt 指数增长
  51. if retryWait > maxRetryWait {
  52. retryWait = maxRetryWait
  53. }
  54. zaplog.Info("Retrying connection", zap.Any("attempt", attempt+1), zap.Any("waitTime", retryWait))
  55. time.Sleep(retryWait)
  56. // 尝试重新连接
  57. token := client.Connect()
  58. if token.Wait() && token.Error() == nil {
  59. zaplog.Info("Reconnected successfully")
  60. connectHandler(client)
  61. return
  62. }
  63. zaplog.Error("Connection retry failed", zap.Any("err", token.Error()))
  64. retryConnect(client, attempt+1) // 递归调用,继续重试
  65. }
  66. func NewServer(config *config.AppConfig) IMqttServer {
  67. conf := config.Mqtt
  68. opts := golangMqtt.NewClientOptions()
  69. opts.AddBroker(fmt.Sprintf("tcp://%s", conf.Broker))
  70. opts.SetClientID(util.GenerateRandomNumberString(16))
  71. opts.SetUsername(conf.UserName)
  72. opts.SetPassword(conf.Password)
  73. opts.SetCleanSession(false)
  74. opts.SetConnectRetry(true)
  75. opts.SetConnectRetryInterval(5 * time.Minute)
  76. opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive))
  77. opts.SetAutoReconnect(conf.AutoReconnect)
  78. opts.SetDefaultPublishHandler(messagePubHandler)
  79. opts.OnConnect = connectHandler
  80. opts.OnConnectionLost = connectLostHandler
  81. client := golangMqtt.NewClient(opts)
  82. if token := client.Connect(); token.Wait() && token.Error() != nil {
  83. retryConnect(client, 0)
  84. }
  85. return &IMqttClient{Client: client, Config: conf}
  86. }
  87. func (s *IMqttClient) Close() {
  88. if s.Client.IsConnected() {
  89. s.Client.Disconnect(250)
  90. }
  91. }
  92. func (s *IMqttClient) Run(enter handleMqtt.Entry) {
  93. // 设置信号监听以优雅关闭服务器
  94. stop := make(chan os.Signal, 1)
  95. signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  96. // 创建上下文,用于优雅关闭
  97. ctx, cancel := context.WithCancel(context.Background())
  98. defer cancel()
  99. go func() {
  100. for {
  101. select {
  102. // 等待停止信号
  103. case <-stop:
  104. cancel()
  105. s.Close()
  106. return
  107. case <-ctx.Done():
  108. return
  109. }
  110. }
  111. }()
  112. // 订阅主题
  113. buffer := bufferPool.Get().([]byte)
  114. if token := s.Client.Subscribe(s.Config.SubTopic, byte(s.Config.Qos), func(client mqtt.Client, msg mqtt.Message) {
  115. copy(buffer, msg.Payload())
  116. select {
  117. case readMsgChan <- buffer[:len(msg.Payload())]:
  118. bufferPool.Put(buffer)
  119. case <-ctx.Done():
  120. return
  121. }
  122. }); token.Wait() && token.Error() != nil {
  123. zaplog.Error("Consumer", zap.Any("topic", s.Config.SubTopic), zap.Any("err", token.Error()))
  124. }
  125. // 启动数据处理
  126. s.Consumer(enter.NeckRingHandle)
  127. }