interface.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package mqtt
  2. import (
  3. "fmt"
  4. "kpt-pasture/config"
  5. "kpt-pasture/util"
  6. "os"
  7. "os/signal"
  8. "syscall"
  9. "time"
  10. handleMqtt "kpt-pasture/module/mqtt"
  11. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  12. golangMqtt "github.com/eclipse/paho.mqtt.golang"
  13. mqtt "github.com/eclipse/paho.mqtt.golang"
  14. "go.uber.org/zap"
  15. )
  16. //var Module = di.Options(di.Provide(NewServer))
  17. type IMqttClient struct {
  18. golangMqtt.Client
  19. Config config.MqttSetting
  20. }
  21. type IMqttServer interface {
  22. Consumer(func([]byte))
  23. Producer(top string, qos int32, data []byte) error
  24. Run(handleMqtt.Entry)
  25. Close()
  26. }
  27. var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
  28. zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
  29. }
  30. var connectHandler = func(client golangMqtt.Client) {
  31. zaplog.Info("connectHandler", zap.Any("client", client))
  32. }
  33. var connectLostHandler = func(client golangMqtt.Client, err error) {
  34. zaplog.Error("connectLost-err", zap.Any("err", err.Error()))
  35. for {
  36. token := client.Connect()
  37. if token.Wait() && token.Error() == nil {
  38. // 成功重连,更新全局客户端实例
  39. connectHandler(client)
  40. break
  41. }
  42. zaplog.Error("ConnectionRetry", zap.Any("err", token.Error()))
  43. time.Sleep(5 * time.Second)
  44. }
  45. zaplog.Error("connectLost-success")
  46. }
  47. func NewServer(config *config.AppConfig) IMqttServer {
  48. conf := config.Mqtt
  49. opts := golangMqtt.NewClientOptions()
  50. opts.AddBroker(fmt.Sprintf("tcp://%s", conf.Broker))
  51. opts.SetClientID(util.GenerateRandomNumberString(16))
  52. opts.SetUsername(conf.UserName)
  53. opts.SetPassword(conf.Password)
  54. opts.SetCleanSession(false)
  55. opts.SetConnectRetry(true)
  56. opts.SetConnectRetryInterval(5 * time.Minute)
  57. opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive))
  58. opts.SetAutoReconnect(conf.AutoReconnect)
  59. opts.SetDefaultPublishHandler(messagePubHandler)
  60. opts.OnConnect = connectHandler
  61. opts.OnConnectionLost = connectLostHandler
  62. client := golangMqtt.NewClient(opts)
  63. if token := client.Connect(); token.Wait() && token.Error() != nil {
  64. panic(token.Error())
  65. }
  66. return &IMqttClient{Client: client, Config: conf}
  67. }
  68. func (s *IMqttClient) Close() {
  69. if s.Client.IsConnected() {
  70. s.Client.Disconnect(250)
  71. }
  72. }
  73. func (s *IMqttClient) Run(enter handleMqtt.Entry) {
  74. // 设置信号监听以优雅关闭服务器
  75. stop := make(chan os.Signal, 1)
  76. signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  77. go func() {
  78. for {
  79. select {
  80. // 等待停止信号
  81. case <-stop:
  82. s.Close()
  83. return
  84. }
  85. }
  86. }()
  87. // 订阅主题
  88. buffer := bufferPool.Get().([]byte)
  89. if token := s.Client.Subscribe(s.Config.SubTopic, byte(s.Config.Qos), func(client mqtt.Client, msg mqtt.Message) {
  90. copy(buffer, msg.Payload())
  91. select {
  92. case readMsgChan <- buffer[:len(msg.Payload())]:
  93. bufferPool.Put(buffer)
  94. }
  95. }); token.Wait() && token.Error() != nil {
  96. zaplog.Error("Consumer", zap.Any("topic", s.Config.SubTopic), zap.Any("err", token.Error()))
  97. }
  98. // 启动数据处理
  99. s.Consumer(enter.NeckRingHandle)
  100. }