package mqtt import ( "fmt" "kpt-pasture/config" mqtt2 "kpt-pasture/module/mqtt" "kpt-pasture/util" "os" "os/signal" "syscall" "time" "go.uber.org/dig" "gitee.com/xuyiping_admin/pkg/di" "gitee.com/xuyiping_admin/pkg/logger/zaplog" golangMqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" ) var Module = di.Options(di.Provide(NewServer)) type IMqttClient struct { dig.In golangMqtt.Client Config config.MqttSetting MqttHub mqtt2.Entry // 处理数据 } type IMqttServer interface { Consumer(func([]byte)) Producer(top string, qos int32, data []byte) error Run() Close() } var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) { zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic())) } var connectHandler = func(client golangMqtt.Client) { zaplog.Info("connectHandler", zap.Any("client", client)) } var connectLostHandler = func(client golangMqtt.Client, err error) { zaplog.Error("connectLost-err", zap.Any("err", err.Error())) for { token := client.Connect() if token.Wait() && token.Error() == nil { // 成功重连,更新全局客户端实例 connectHandler(client) break } zaplog.Error("ConnectionRetry", zap.Any("err", token.Error())) time.Sleep(5 * time.Second) } zaplog.Error("connectLost-success") } func NewServer(config *config.AppConfig) IMqttServer { conf := config.Mqtt opts := golangMqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s", conf.Broker)) opts.SetClientID(util.GenerateRandomNumberString(16)) opts.SetUsername(conf.UserName) opts.SetPassword(conf.Password) opts.SetCleanSession(false) opts.SetConnectRetry(true) opts.SetConnectRetryInterval(5 * time.Minute) opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive)) opts.SetAutoReconnect(conf.AutoReconnect) opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := golangMqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } return &IMqttClient{Client: client, Config: conf} } func (s *IMqttClient) Close() { if s.Client.IsConnected() { s.Client.Disconnect(250) } } func (s *IMqttClient) Run() { // 设置信号监听以优雅关闭服务器 stop := make(chan os.Signal, 1) signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) go func() { for { select { // 等待停止信号 case <-stop: s.Close() return } } }() // 订阅主题 buffer := bufferPool.Get().([]byte) if token := s.Client.Subscribe(s.Config.SubTopic, byte(s.Config.Qos), func(client mqtt.Client, msg mqtt.Message) { copy(buffer, msg.Payload()) select { case readMsgChan <- buffer[:len(msg.Payload())]: bufferPool.Put(buffer) } }); token.Wait() && token.Error() != nil { zaplog.Error("Consumer", zap.Any("topic", s.Config.SubTopic), zap.Any("err", token.Error())) } // 启动数据处理 s.Consumer(s.MqttHub.NeckRingHandle) }