| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 | package mqttimport (	"context"	"fmt"	"kpt-pasture/config"	"kpt-pasture/util"	"os"	"os/signal"	"syscall"	"time"	handleMqtt "kpt-pasture/module/mqtt"	"gitee.com/xuyiping_admin/pkg/logger/zaplog"	golangMqtt "github.com/eclipse/paho.mqtt.golang"	mqtt "github.com/eclipse/paho.mqtt.golang"	"go.uber.org/zap")//var Module = di.Options(di.Provide(NewServer))type IMqttClient struct {	golangMqtt.Client	Config config.MqttSetting}type IMqttServer interface {	Consumer(func([]byte))	Producer(top string, qos int32, data []byte) error	Run(handleMqtt.Entry)	Close()}var (	maxRetryAttempts = 500             // 最大重试次数	initialRetryWait = 5 * time.Minute // 初始重试间隔	maxRetryWait     = 1 * time.Hour   // 最大重试间隔)var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))}var connectHandler = func(client golangMqtt.Client) {	zaplog.Info("connectHandler", zap.Any("client", client))}var connectLostHandler = func(client golangMqtt.Client, err error) {	zaplog.Error("connectLost-err", zap.Any("err", err.Error()))	retryConnect(client, 0)}// 重试连接函数func retryConnect(client golangMqtt.Client, attempt int) {	if attempt >= maxRetryAttempts {		zaplog.Error("Max retry attempts reached, giving up")		return	}	// 计算重试间隔(指数退避)	retryWait := initialRetryWait * time.Duration(1<<attempt) // 2^attempt 指数增长	if retryWait > maxRetryWait {		retryWait = maxRetryWait	}	zaplog.Info("Retrying connection", zap.Any("attempt", attempt+1), zap.Any("waitTime", retryWait))	time.Sleep(retryWait)	// 尝试重新连接	token := client.Connect()	if token.Wait() && token.Error() == nil {		zaplog.Info("Reconnected successfully")		connectHandler(client)		return	}	zaplog.Error("Connection retry failed", zap.Any("err", token.Error()))	retryConnect(client, attempt+1) // 递归调用,继续重试}func NewServer(config *config.AppConfig) IMqttServer {	conf := config.Mqtt	opts := golangMqtt.NewClientOptions()	opts.AddBroker(fmt.Sprintf("tcp://%s", conf.Broker))	opts.SetClientID(util.GenerateRandomNumberString(16))	opts.SetUsername(conf.UserName)	opts.SetPassword(conf.Password)	opts.SetCleanSession(false)	opts.SetConnectRetry(true)	opts.SetConnectRetryInterval(5 * time.Minute)	opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive))	opts.SetAutoReconnect(conf.AutoReconnect)	opts.SetDefaultPublishHandler(messagePubHandler)	opts.OnConnect = connectHandler	opts.OnConnectionLost = connectLostHandler	client := golangMqtt.NewClient(opts)	if token := client.Connect(); token.Wait() && token.Error() != nil {		retryConnect(client, 0)	}	return &IMqttClient{Client: client, Config: conf}}func (s *IMqttClient) Close() {	if s.Client.IsConnected() {		s.Client.Disconnect(250)	}}func (s *IMqttClient) Run(enter handleMqtt.Entry) {	// 设置信号监听以优雅关闭服务器	stop := make(chan os.Signal, 1)	signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)	// 创建上下文,用于优雅关闭	ctx, cancel := context.WithCancel(context.Background())	defer cancel()	go func() {		for {			select {			// 等待停止信号			case <-stop:				cancel()				s.Close()				return			case <-ctx.Done():				return			}		}	}()	// 订阅主题	buffer := bufferPool.Get().([]byte)	if token := s.Client.Subscribe(s.Config.SubTopic, byte(s.Config.Qos), func(client mqtt.Client, msg mqtt.Message) {		copy(buffer, msg.Payload())		select {		case readMsgChan <- buffer[:len(msg.Payload())]:			bufferPool.Put(buffer)		case <-ctx.Done():			return		}	}); token.Wait() && token.Error() != nil {		zaplog.Error("Consumer", zap.Any("topic", s.Config.SubTopic), zap.Any("err", token.Error()))	}	// 启动数据处理	s.Consumer(enter.NeckRingHandle)}
 |