| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 | 
							- package mqtt
 
- import (
 
- 	"context"
 
- 	"fmt"
 
- 	"kpt-pasture/config"
 
- 	"kpt-pasture/util"
 
- 	"os"
 
- 	"os/signal"
 
- 	"syscall"
 
- 	"time"
 
- 	handleMqtt "kpt-pasture/module/mqtt"
 
- 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 
- 	golangMqtt "github.com/eclipse/paho.mqtt.golang"
 
- 	mqtt "github.com/eclipse/paho.mqtt.golang"
 
- 	"go.uber.org/zap"
 
- )
 
- //var Module = di.Options(di.Provide(NewServer))
 
- type IMqttClient struct {
 
- 	golangMqtt.Client
 
- 	Config config.MqttSetting
 
- }
 
- type IMqttServer interface {
 
- 	Consumer(func([]byte))
 
- 	Producer(top string, qos int32, data []byte) error
 
- 	Run(handleMqtt.Entry)
 
- 	Close()
 
- }
 
- var (
 
- 	maxRetryAttempts = 500             // 最大重试次数
 
- 	initialRetryWait = 5 * time.Minute // 初始重试间隔
 
- 	maxRetryWait     = 1 * time.Hour   // 最大重试间隔
 
- )
 
- var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
 
- 	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
 
- }
 
- var connectHandler = func(client golangMqtt.Client) {
 
- 	zaplog.Info("connectHandler", zap.Any("client", client))
 
- }
 
- var connectLostHandler = func(client golangMqtt.Client, err error) {
 
- 	zaplog.Error("connectLost-err", zap.Any("err", err.Error()))
 
- 	retryConnect(client, 0)
 
- }
 
- // 重试连接函数
 
- func retryConnect(client golangMqtt.Client, attempt int) {
 
- 	if attempt >= maxRetryAttempts {
 
- 		zaplog.Error("Max retry attempts reached, giving up")
 
- 		return
 
- 	}
 
- 	// 计算重试间隔(指数退避)
 
- 	retryWait := initialRetryWait * time.Duration(1<<attempt) // 2^attempt 指数增长
 
- 	if retryWait > maxRetryWait {
 
- 		retryWait = maxRetryWait
 
- 	}
 
- 	zaplog.Info("Retrying connection", zap.Any("attempt", attempt+1), zap.Any("waitTime", retryWait))
 
- 	time.Sleep(retryWait)
 
- 	// 尝试重新连接
 
- 	token := client.Connect()
 
- 	if token.Wait() && token.Error() == nil {
 
- 		zaplog.Info("Reconnected successfully")
 
- 		connectHandler(client)
 
- 		return
 
- 	}
 
- 	zaplog.Error("Connection retry failed", zap.Any("err", token.Error()))
 
- 	retryConnect(client, attempt+1) // 递归调用,继续重试
 
- }
 
- func NewServer(config *config.AppConfig) IMqttServer {
 
- 	conf := config.Mqtt
 
- 	opts := golangMqtt.NewClientOptions()
 
- 	opts.AddBroker(fmt.Sprintf("tcp://%s", conf.Broker))
 
- 	opts.SetClientID(util.GenerateRandomNumberString(16))
 
- 	opts.SetUsername(conf.UserName)
 
- 	opts.SetPassword(conf.Password)
 
- 	opts.SetCleanSession(false)
 
- 	opts.SetConnectRetry(true)
 
- 	opts.SetConnectRetryInterval(5 * time.Minute)
 
- 	opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive))
 
- 	opts.SetAutoReconnect(conf.AutoReconnect)
 
- 	opts.SetDefaultPublishHandler(messagePubHandler)
 
- 	opts.OnConnect = connectHandler
 
- 	opts.OnConnectionLost = connectLostHandler
 
- 	client := golangMqtt.NewClient(opts)
 
- 	if token := client.Connect(); token.Wait() && token.Error() != nil {
 
- 		retryConnect(client, 0)
 
- 	}
 
- 	return &IMqttClient{Client: client, Config: conf}
 
- }
 
- func (s *IMqttClient) Close() {
 
- 	if s.Client.IsConnected() {
 
- 		s.Client.Disconnect(250)
 
- 	}
 
- }
 
- func (s *IMqttClient) Run(enter handleMqtt.Entry) {
 
- 	// 设置信号监听以优雅关闭服务器
 
- 	stop := make(chan os.Signal, 1)
 
- 	signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
 
- 	// 创建上下文,用于优雅关闭
 
- 	ctx, cancel := context.WithCancel(context.Background())
 
- 	defer cancel()
 
- 	go func() {
 
- 		for {
 
- 			select {
 
- 			// 等待停止信号
 
- 			case <-stop:
 
- 				cancel()
 
- 				s.Close()
 
- 				return
 
- 			case <-ctx.Done():
 
- 				return
 
- 			}
 
- 		}
 
- 	}()
 
- 	// 订阅主题
 
- 	buffer := bufferPool.Get().([]byte)
 
- 	if token := s.Client.Subscribe(s.Config.SubTopic, byte(s.Config.Qos), func(client mqtt.Client, msg mqtt.Message) {
 
- 		copy(buffer, msg.Payload())
 
- 		select {
 
- 		case readMsgChan <- buffer[:len(msg.Payload())]:
 
- 			bufferPool.Put(buffer)
 
- 		case <-ctx.Done():
 
- 			return
 
- 		}
 
- 	}); token.Wait() && token.Error() != nil {
 
- 		zaplog.Error("Consumer", zap.Any("topic", s.Config.SubTopic), zap.Any("err", token.Error()))
 
- 	}
 
- 	// 启动数据处理
 
- 	go enter.InitReceiverMapUpdater()
 
- 	s.Consumer(enter.NeckRingHandle)
 
- }
 
 
  |