package mqtt import ( "context" "fmt" "kpt-pasture/config" "kpt-pasture/util" "os" "os/signal" "syscall" "time" handleMqtt "kpt-pasture/module/mqtt" "gitee.com/xuyiping_admin/pkg/logger/zaplog" golangMqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" ) //var Module = di.Options(di.Provide(NewServer)) type IMqttClient struct { golangMqtt.Client Config config.MqttSetting } type IMqttServer interface { Consumer(func([]byte)) Producer(top string, qos int32, data []byte) error Run(handleMqtt.Entry) Close() } var ( maxRetryAttempts = 500 // 最大重试次数 initialRetryWait = 5 * time.Minute // 初始重试间隔 maxRetryWait = 1 * time.Hour // 最大重试间隔 ) var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) { zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic())) } var connectHandler = func(client golangMqtt.Client) { zaplog.Info("connectHandler", zap.Any("client", client)) } var connectLostHandler = func(client golangMqtt.Client, err error) { zaplog.Error("connectLost-err", zap.Any("err", err.Error())) retryConnect(client, 0) } // 重试连接函数 func retryConnect(client golangMqtt.Client, attempt int) { if attempt >= maxRetryAttempts { zaplog.Error("Max retry attempts reached, giving up") return } // 计算重试间隔(指数退避) retryWait := initialRetryWait * time.Duration(1< maxRetryWait { retryWait = maxRetryWait } zaplog.Info("Retrying connection", zap.Any("attempt", attempt+1), zap.Any("waitTime", retryWait)) time.Sleep(retryWait) // 尝试重新连接 token := client.Connect() if token.Wait() && token.Error() == nil { zaplog.Info("Reconnected successfully") connectHandler(client) return } zaplog.Error("Connection retry failed", zap.Any("err", token.Error())) retryConnect(client, attempt+1) // 递归调用,继续重试 } func NewServer(config *config.AppConfig) IMqttServer { conf := config.Mqtt opts := golangMqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s", conf.Broker)) opts.SetClientID(util.GenerateRandomNumberString(16)) opts.SetUsername(conf.UserName) opts.SetPassword(conf.Password) opts.SetCleanSession(false) opts.SetConnectRetry(true) opts.SetConnectRetryInterval(5 * time.Minute) opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive)) opts.SetAutoReconnect(conf.AutoReconnect) opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := golangMqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { retryConnect(client, 0) } return &IMqttClient{Client: client, Config: conf} } func (s *IMqttClient) Close() { if s.Client.IsConnected() { s.Client.Disconnect(250) } } func (s *IMqttClient) Run(enter handleMqtt.Entry) { // 设置信号监听以优雅关闭服务器 stop := make(chan os.Signal, 1) signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) // 创建上下文,用于优雅关闭 ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { for { select { // 等待停止信号 case <-stop: cancel() s.Close() return case <-ctx.Done(): return } } }() // 订阅主题 buffer := bufferPool.Get().([]byte) if token := s.Client.Subscribe(s.Config.SubTopic, byte(s.Config.Qos), func(client mqtt.Client, msg mqtt.Message) { copy(buffer, msg.Payload()) select { case readMsgChan <- buffer[:len(msg.Payload())]: bufferPool.Put(buffer) case <-ctx.Done(): return } }); token.Wait() && token.Error() != nil { zaplog.Error("Consumer", zap.Any("topic", s.Config.SubTopic), zap.Any("err", token.Error())) } // 启动数据处理 s.Consumer(enter.NeckRingHandle) }