Преглед на файлове

mqtt: 断联重试机制

Yi преди 3 месеца
родител
ревизия
9594c4afc7
променени са 3 файла, в които са добавени 55 реда и са изтрити 13 реда
  1. 2 2
      cmd/mqtt.go
  2. 2 3
      module/mqtt/interface.go
  3. 51 8
      module/mqtt/sub.go

+ 2 - 2
cmd/mqtt.go

@@ -13,7 +13,7 @@ var mqttCmd = &cobra.Command{
 	Run: func(cmd *cobra.Command, args []string) {
 		configOption := config.Options()
 		dataEvent := dep.DIMqtt()
-		mqttClient := dataEvent.DataEventEntry.NewMqtt(configOption)
-		dataEvent.DataEventEntry.SubMsgLog(configOption, mqttClient)
+		dataEvent.DataEventEntry.NewMqtt(configOption)
+		dataEvent.DataEventEntry.SubMsgLog(configOption)
 	},
 }

+ 2 - 3
module/mqtt/interface.go

@@ -2,7 +2,6 @@ package mqtt
 
 import (
 	"gitee.com/xuyiping_admin/pkg/di"
-	golangMqtt "github.com/eclipse/paho.mqtt.golang"
 	"go.uber.org/dig"
 	"kpt-temporary-mqtt/config"
 	"kpt-temporary-mqtt/store/kptstore"
@@ -13,8 +12,8 @@ var Module = di.Options(
 )
 
 type DataEvent interface {
-	NewMqtt(configOption *config.AppConfig) golangMqtt.Client
-	SubMsgLog(configOption *config.AppConfig, client golangMqtt.Client)
+	NewMqtt(configOption *config.AppConfig)
+	SubMsgLog(configOption *config.AppConfig)
 }
 
 type DataEventEntry struct {

+ 51 - 8
module/mqtt/sub.go

@@ -17,6 +17,8 @@ import (
 	"time"
 )
 
+var golangMqttClient golangMqtt.Client
+
 var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
 	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
 }
@@ -25,16 +27,45 @@ var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client)
 	zaplog.Info("connectedClient", zap.Any("client", client))
 }
 
+// 连接丢失处理
 var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
-	zaplog.Info("connectLost", zap.Any("err", err.Error()))
+	zaplog.Error("connectLost", zap.Any("err", err.Error()))
+	ConnectionRetry()
+}
+
+// ConnectionRetry 尝试重新连接
+func ConnectionRetry() {
+	options := config.Options()
+	opts := golangMqtt.NewClientOptions().
+		AddBroker(fmt.Sprintf("tcp://%s:%d", options.Broker, options.Port)).
+		SetClientID(util.RandString(12)).
+		SetUsername(options.UserName).
+		SetPassword(options.Password).
+		SetDefaultPublishHandler(messagePubHandler)
+	opts.SetMaxReconnectInterval(5 * time.Minute)
+	opts.SetKeepAlive(2 * time.Minute)
+	opts.SetAutoReconnect(true)
+	opts.SetConnectRetry(true)
+
+	newClient := golangMqtt.NewClient(opts)
+	if token := newClient.Connect(); token.Wait() && token.Error() == nil {
+		// 成功重连,更新全局客户端实例
+		golangMqttClient = newClient
+		return // 退出重连循环
+	} else {
+		zaplog.Error("ConnectionRetry", zap.Any("err", token.Error()))
+	}
 }
 
-func (d *DataEventEntry) NewMqtt(configOption *config.AppConfig) golangMqtt.Client {
+func (d *DataEventEntry) NewMqtt(configOption *config.AppConfig) {
 	opts := golangMqtt.NewClientOptions()
 	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", configOption.Broker, configOption.Port))
-	opts.SetClientID(util.RandString(6))
+	opts.SetClientID(util.RandString(12))
 	opts.SetUsername(configOption.UserName)
 	opts.SetPassword(configOption.Password)
+	opts.SetKeepAlive(2 * time.Minute)
+	opts.SetAutoReconnect(true)
+	opts.SetConnectRetry(true)
 	opts.SetDefaultPublishHandler(messagePubHandler)
 	opts.OnConnect = connectHandler
 	opts.OnConnectionLost = connectLostHandler
@@ -42,7 +73,7 @@ func (d *DataEventEntry) NewMqtt(configOption *config.AppConfig) golangMqtt.Clie
 	if token := client.Connect(); token.Wait() && token.Error() != nil {
 		panic(token.Error())
 	}
-	return client
+	golangMqttClient = client
 }
 
 type DataInsertSubMsgLog struct {
@@ -50,9 +81,9 @@ type DataInsertSubMsgLog struct {
 	Mx            *sync.RWMutex
 }
 
-func (d *DataEventEntry) SubMsgLog(configOption *config.AppConfig, client golangMqtt.Client) {
+func (d *DataEventEntry) SubMsgLog(configOption *config.AppConfig) {
 	var subMsgChan = make(chan []byte, configOption.WorkNumber)
-	client.Subscribe(configOption.SubTopName, 1, func(client golangMqtt.Client, msg golangMqtt.Message) {
+	golangMqttClient.Subscribe(configOption.SubTopName, 1, func(client golangMqtt.Client, msg golangMqtt.Message) {
 		subMsgChan <- msg.Payload()
 	})
 
@@ -67,8 +98,20 @@ func (d *DataEventEntry) SubMsgLog(configOption *config.AppConfig, client golang
 	sc := make(chan os.Signal, 1)
 	signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
 
-	// 设置5分钟超时
-	tc := time.After(5 * time.Minute)
+	// 设置2分钟超时
+	tc := time.After(2 * time.Minute)
+
+	go func() {
+		for {
+			select {
+			case <-tc:
+				if !golangMqttClient.IsConnectionOpen() || !golangMqttClient.IsConnected() {
+					zaplog.Info("IsConnectionOpen")
+					ConnectionRetry()
+				}
+			}
+		}
+	}()
 
 	//for i := 0; i < int(configOption.WorkNumber); i++ {
 	//go func() {