123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- package mqtt
- import (
- "context"
- "fmt"
- "kpt-pasture/config"
- "kpt-pasture/util"
- "os"
- "os/signal"
- "syscall"
- "time"
- handleMqtt "kpt-pasture/module/mqtt"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- golangMqtt "github.com/eclipse/paho.mqtt.golang"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "go.uber.org/zap"
- )
- //var Module = di.Options(di.Provide(NewServer))
- type IMqttClient struct {
- golangMqtt.Client
- Config config.MqttSetting
- }
- type IMqttServer interface {
- Consumer(func([]byte))
- Producer(top string, qos int32, data []byte) error
- Run(handleMqtt.Entry)
- Close()
- }
- var (
- maxRetryAttempts = 500 // 最大重试次数
- initialRetryWait = 5 * time.Minute // 初始重试间隔
- maxRetryWait = 1 * time.Hour // 最大重试间隔
- )
- var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
- zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
- }
- var connectHandler = func(client golangMqtt.Client) {
- zaplog.Info("connectHandler", zap.Any("client", client))
- }
- var connectLostHandler = func(client golangMqtt.Client, err error) {
- zaplog.Error("connectLost-err", zap.Any("err", err.Error()))
- retryConnect(client, 0)
- }
- // 重试连接函数
- func retryConnect(client golangMqtt.Client, attempt int) {
- if attempt >= maxRetryAttempts {
- zaplog.Error("Max retry attempts reached, giving up")
- return
- }
- // 计算重试间隔(指数退避)
- retryWait := initialRetryWait * time.Duration(1<<attempt) // 2^attempt 指数增长
- if retryWait > maxRetryWait {
- retryWait = maxRetryWait
- }
- zaplog.Info("Retrying connection", zap.Any("attempt", attempt+1), zap.Any("waitTime", retryWait))
- time.Sleep(retryWait)
- // 尝试重新连接
- token := client.Connect()
- if token.Wait() && token.Error() == nil {
- zaplog.Info("Reconnected successfully")
- connectHandler(client)
- return
- }
- zaplog.Error("Connection retry failed", zap.Any("err", token.Error()))
- retryConnect(client, attempt+1) // 递归调用,继续重试
- }
- func NewServer(config *config.AppConfig) IMqttServer {
- conf := config.Mqtt
- opts := golangMqtt.NewClientOptions()
- opts.AddBroker(fmt.Sprintf("tcp://%s", conf.Broker))
- opts.SetClientID(util.GenerateRandomNumberString(16))
- opts.SetUsername(conf.UserName)
- opts.SetPassword(conf.Password)
- opts.SetCleanSession(false)
- opts.SetConnectRetry(true)
- opts.SetConnectRetryInterval(5 * time.Minute)
- opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive))
- opts.SetAutoReconnect(conf.AutoReconnect)
- opts.SetDefaultPublishHandler(messagePubHandler)
- opts.OnConnect = connectHandler
- opts.OnConnectionLost = connectLostHandler
- client := golangMqtt.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- retryConnect(client, 0)
- }
- return &IMqttClient{Client: client, Config: conf}
- }
- func (s *IMqttClient) Close() {
- if s.Client.IsConnected() {
- s.Client.Disconnect(250)
- }
- }
- func (s *IMqttClient) Run(enter handleMqtt.Entry) {
- // 设置信号监听以优雅关闭服务器
- stop := make(chan os.Signal, 1)
- signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
- // 创建上下文,用于优雅关闭
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go func() {
- for {
- select {
- // 等待停止信号
- case <-stop:
- cancel()
- s.Close()
- return
- case <-ctx.Done():
- return
- }
- }
- }()
- // 订阅主题
- buffer := bufferPool.Get().([]byte)
- if token := s.Client.Subscribe(s.Config.SubTopic, byte(s.Config.Qos), func(client mqtt.Client, msg mqtt.Message) {
- copy(buffer, msg.Payload())
- select {
- case readMsgChan <- buffer[:len(msg.Payload())]:
- bufferPool.Put(buffer)
- case <-ctx.Done():
- return
- }
- }); token.Wait() && token.Error() != nil {
- zaplog.Error("Consumer", zap.Any("topic", s.Config.SubTopic), zap.Any("err", token.Error()))
- }
- // 启动数据处理
- s.Consumer(enter.NeckRingHandle)
- }
|