Browse Source

connection: reset by peer

Yi 3 months ago
parent
commit
0cb5df4312
5 changed files with 117 additions and 84 deletions
  1. 8 0
      .drone.yml
  2. 3 5
      cmd/mqtt.go
  3. 1 0
      model/sub_msg_log.go
  4. 2 3
      module/mqtt/interface.go
  5. 103 76
      module/mqtt/sub.go

+ 8 - 0
.drone.yml

@@ -2,7 +2,15 @@ kind: pipeline
 type: docker
 name: kptTemporaryMqtt
 
+clone:
+  depth: 1
+  disable: true
 steps:
+  - name: clone
+    image: alpine/git
+    commands:
+      - git clone -b master http://192.168.1.8:3000/xuyiping/kpt-temporary-mqtt.git
+      - cp -R kpt-temporary-mqtt/* ./
   - name: build
     image: plugins/docker:20.14.2
     volumes:

+ 3 - 5
cmd/mqtt.go

@@ -2,7 +2,6 @@ package cmd
 
 import (
 	"github.com/spf13/cobra"
-	"kpt-temporary-mqtt/config"
 	"kpt-temporary-mqtt/dep"
 )
 
@@ -11,9 +10,8 @@ var mqttCmd = &cobra.Command{
 	Use:   "mqtt",
 	Short: "mqtt",
 	Run: func(cmd *cobra.Command, args []string) {
-		configOption := config.Options()
-		dataEvent := dep.DIMqtt()
-		dataEvent.DataEventEntry.NewMqtt(configOption)
-		dataEvent.DataEventEntry.SubMsgLog(configOption)
+		dataEvent := dep.DIMqtt().DataEventEntry
+		dataEvent.NewMqtt()
+		dataEvent.SubMsgLog()
 	},
 }

+ 1 - 0
model/sub_msg_log.go

@@ -16,6 +16,7 @@ type SubMsgLog struct {
 	Gasp      int32  `json:"gasp"`
 	Other     int32  `json:"other"`
 	ReMain    int32  `json:"remain"`
+	Nccid     string `json:"nccid"`
 	CreatedAt int64  `json:"createdAt"`
 	UpdatedAt int64  `json:"updatedAt"`
 }

+ 2 - 3
module/mqtt/interface.go

@@ -3,7 +3,6 @@ package mqtt
 import (
 	"gitee.com/xuyiping_admin/pkg/di"
 	"go.uber.org/dig"
-	"kpt-temporary-mqtt/config"
 	"kpt-temporary-mqtt/store/kptstore"
 )
 
@@ -12,8 +11,8 @@ var Module = di.Options(
 )
 
 type DataEvent interface {
-	NewMqtt(configOption *config.AppConfig)
-	SubMsgLog(configOption *config.AppConfig)
+	NewMqtt()
+	SubMsgLog()
 }
 
 type DataEventEntry struct {

+ 103 - 76
module/mqtt/sub.go

@@ -17,7 +17,11 @@ import (
 	"time"
 )
 
-var golangMqttClient golangMqtt.Client
+var (
+	configOption     = config.Options()
+	golangMqttClient golangMqtt.Client
+	subMsgChan       = make(chan []byte, configOption.WorkNumber)
+)
 
 var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
 	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
@@ -25,16 +29,33 @@ var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client,
 
 var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
 	zaplog.Info("connectedClient", zap.Any("client", client))
+	client.Subscribe(configOption.SubTopName, 1, func(client golangMqtt.Client, msg golangMqtt.Message) {
+		subMsgChan <- msg.Payload()
+	})
 }
 
 // 连接丢失处理
 var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
-	zaplog.Error("connectLost", zap.Any("err", err.Error()))
-	ConnectionRetry()
+	zaplog.Error("connectLost", zap.Any("err", err.Error()), zap.Any("golangMqttClient", golangMqttClient))
+	connectionRetry(client)
+	zaplog.Info("connectLost", zap.Any("ConnectionRetry", "ok"), zap.Any("golangMqttClient", golangMqttClient))
+}
+
+// connectionRetry 尝试重新连接
+func connectionRetry(client golangMqtt.Client) {
+	for {
+		token := client.Connect()
+		if token.Wait() && token.Error() == nil {
+			// 成功重连,更新全局客户端实例
+			golangMqttClient = client
+			return
+		}
+		zaplog.Error("ConnectionRetry", zap.Any("err", token.Error()))
+		time.Sleep(5 * time.Second)
+	}
 }
 
-// ConnectionRetry 尝试重新连接
-func ConnectionRetry() {
+func (d *DataEventEntry) NewMqtt() {
 	options := config.Options()
 	opts := golangMqtt.NewClientOptions().
 		AddBroker(fmt.Sprintf("tcp://%s:%d", options.Broker, options.Port)).
@@ -46,34 +67,17 @@ func ConnectionRetry() {
 	opts.SetKeepAlive(2 * time.Minute)
 	opts.SetAutoReconnect(true)
 	opts.SetConnectRetry(true)
+	opts.SetCleanSession(false)
+	opts.OnConnect = connectHandler
+	opts.OnConnectionLost = connectLostHandler
 
 	newClient := golangMqtt.NewClient(opts)
 	if token := newClient.Connect(); token.Wait() && token.Error() == nil {
 		// 成功重连,更新全局客户端实例
 		golangMqttClient = newClient
-		return // 退出重连循环
 	} else {
-		zaplog.Error("ConnectionRetry", zap.Any("err", token.Error()))
-	}
-}
-
-func (d *DataEventEntry) NewMqtt(configOption *config.AppConfig) {
-	opts := golangMqtt.NewClientOptions()
-	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", configOption.Broker, configOption.Port))
-	opts.SetClientID(util.RandString(12))
-	opts.SetUsername(configOption.UserName)
-	opts.SetPassword(configOption.Password)
-	opts.SetKeepAlive(2 * time.Minute)
-	opts.SetAutoReconnect(true)
-	opts.SetConnectRetry(true)
-	opts.SetDefaultPublishHandler(messagePubHandler)
-	opts.OnConnect = connectHandler
-	opts.OnConnectionLost = connectLostHandler
-	client := golangMqtt.NewClient(opts)
-	if token := client.Connect(); token.Wait() && token.Error() != nil {
 		panic(token.Error())
 	}
-	golangMqttClient = client
 }
 
 type DataInsertSubMsgLog struct {
@@ -81,12 +85,7 @@ type DataInsertSubMsgLog struct {
 	Mx            *sync.RWMutex
 }
 
-func (d *DataEventEntry) SubMsgLog(configOption *config.AppConfig) {
-	var subMsgChan = make(chan []byte, configOption.WorkNumber)
-	golangMqttClient.Subscribe(configOption.SubTopName, 1, func(client golangMqtt.Client, msg golangMqtt.Message) {
-		subMsgChan <- msg.Payload()
-	})
-
+func (d *DataEventEntry) SubMsgLog() {
 	DSMLog := DataInsertSubMsgLog{
 		SubMsgLogList: make([]*model.SubMsgLog, 0),
 		Mx:            &sync.RWMutex{},
@@ -100,27 +99,14 @@ func (d *DataEventEntry) SubMsgLog(configOption *config.AppConfig) {
 
 	// 设置2分钟超时
 	tc := time.After(2 * time.Minute)
-
-	go func() {
-		for {
-			select {
-			case <-tc:
-				if !golangMqttClient.IsConnectionOpen() || !golangMqttClient.IsConnected() {
-					zaplog.Info("IsConnectionOpen")
-					ConnectionRetry()
-				}
-			}
-		}
-	}()
-
-	//for i := 0; i < int(configOption.WorkNumber); i++ {
-	//go func() {
 	for {
 		select {
 		case msg := <-subMsgChan:
 			subMsLog := d.CreatMsgLog(msg)
 			if subMsLog != nil {
 				batchList = append(batchList, subMsLog)
+			} else {
+				zaplog.Error("subMsgChan-imei", zap.Any("subMsLog", string(msg)))
 			}
 			if len(batchList) >= batchSize {
 				DSMLog.Mx.Lock()
@@ -136,55 +122,38 @@ func (d *DataEventEntry) SubMsgLog(configOption *config.AppConfig) {
 			}
 		// 优雅退出
 		case <-sc:
-			if len(DSMLog.SubMsgLogList) > 0 {
-				DSMLog.Mx.Lock()
-				if err := d.DB.Create(DSMLog.SubMsgLogList).Error; err != nil {
-					zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("subMsgLog", DSMLog.SubMsgLogList))
-				}
-				zaplog.Info("subMsgChan-os", zap.Any("success", DSMLog.SubMsgLogList))
-				DSMLog.SubMsgLogList = make([]*model.SubMsgLog, 0)
-				DSMLog.Mx.Unlock()
-			}
-			close(subMsgChan)
+			d.handleSignal(&DSMLog, subMsgChan, true)
 		case <-tc:
-			if len(DSMLog.SubMsgLogList) > 0 {
-				DSMLog.Mx.Lock()
-				if err := d.DB.Create(DSMLog.SubMsgLogList).Error; err != nil {
-					zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("subMsgLog", DSMLog.SubMsgLogList))
-				}
-				zaplog.Info("subMsgChan-os", zap.Any("success", DSMLog.SubMsgLogList))
-				DSMLog.SubMsgLogList = make([]*model.SubMsgLog, 0)
-				DSMLog.Mx.Unlock()
-			}
+			d.handleSignal(&DSMLog, subMsgChan, false)
 		}
 	}
-
-	//}()
-	//}
 }
 
+var isDelete bool
+
 func (d *DataEventEntry) CreatMsgLog(msg []byte) *model.SubMsgLog {
+	if len(msg) <= 0 {
+		return nil
+	}
 	defer func() {
-		if time.Now().Day()%15 == 0 {
+		if time.Now().Day()%15 == 0 && !isDelete {
 			d.DB.Model(new(model.SubMsgLog)).Where("created_at < ?", time.Now().AddDate(0, 0, -15).Unix()).Delete(new(model.SubMsgLog))
+			isDelete = true
 			return
 		}
 	}()
-	subMsgLog, err := d.MsgDataFormat(msg)
-	if err != nil {
-		zaplog.Error("CreatMsgLog", zap.Any("err", err), zap.Any("msg", string(msg)))
-	}
+	subMsgLog := d.MsgDataFormat(msg)
 	if subMsgLog == nil {
 		return nil
 	}
+
 	if subMsgLog.Imei == "" {
-		zaplog.Info("CreatMsgLog", zap.Any("msg", string(msg)), zap.Any("subMsgLog", subMsgLog))
 		return nil
 	}
 	return subMsgLog
 }
 
-func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.SubMsgLog, error) {
+func (d *DataEventEntry) MsgDataFormat(msg []byte) *model.SubMsgLog {
 	msgData := make(map[string]interface{})
 	pairs := strings.Split(util.MsgFormat(string(msg)), " ")
 	for _, pair := range pairs {
@@ -206,6 +175,14 @@ func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.SubMsgLog, error) {
 		}
 	}
 
+	if softVer <= 0 {
+		if softVerInter, ok := msgData["soft_ver"]; ok {
+			if softVerstr, ok := softVerInter.(string); ok {
+				softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
+			}
+		}
+	}
+
 	uuid := ""
 	if uuidInter, ok := msgData["uuid"]; ok {
 		if uuidStr, ok := uuidInter.(string); ok {
@@ -240,6 +217,14 @@ func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.SubMsgLog, error) {
 		}
 	}
 
+	if temp <= 0 {
+		if tempInter, ok := msgData["temp"]; ok {
+			if tempFloat, ok := tempInter.(string); ok {
+				temp, _ = strconv.ParseFloat(tempFloat, 64)
+			}
+		}
+	}
+
 	imei := ""
 	if imeiInter, ok := msgData["imei"]; ok {
 		if imeiStr, ok := imeiInter.(string); ok {
@@ -267,6 +252,13 @@ func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.SubMsgLog, error) {
 			ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
 		}
 	}
+	if ruMina <= 0 {
+		if ruMinaInter, ok := msgData["rumina"]; ok {
+			if ruMina32, ok := ruMinaInter.(string); ok {
+				ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
+			}
+		}
+	}
 
 	intake := int64(0)
 	if intakeInter, ok := msgData["Intake"]; ok {
@@ -275,6 +267,12 @@ func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.SubMsgLog, error) {
 		}
 	}
 
+	if intakeInter, ok := msgData["intake"]; ok {
+		if intake32, ok := intakeInter.(string); ok {
+			intake, _ = strconv.ParseInt(intake32, 10, 64)
+		}
+	}
+
 	gasp := int64(0)
 	if gaspInter, ok := msgData["gasp"]; ok {
 		if gasp32, ok := gaspInter.(string); ok {
@@ -296,6 +294,19 @@ func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.SubMsgLog, error) {
 		}
 	}
 
+	if reMainInter, ok := msgData["remain"]; ok {
+		if reMain32, ok := reMainInter.(string); ok {
+			reMain, _ = strconv.ParseInt(reMain32, 10, 64)
+		}
+	}
+
+	nccId := ""
+	if nccIdInter, ok := msgData["nccid"]; ok {
+		if nccIdStr, ok := nccIdInter.(string); ok {
+			nccId = nccIdStr
+		}
+	}
+
 	return &model.SubMsgLog{
 		SoftVer:  softVer,
 		Uuid:     uuid,
@@ -311,6 +322,22 @@ func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.SubMsgLog, error) {
 		Gasp:     int32(gasp),
 		Other:    int32(other),
 		ReMain:   int32(reMain),
-	}, nil
+		Nccid:    nccId,
+	}
+}
 
+func (d *DataEventEntry) handleSignal(dsmLog *DataInsertSubMsgLog, subMsgChan chan []byte, isCloseChan bool) {
+	if len(dsmLog.SubMsgLogList) > 0 {
+		dsmLog.Mx.Lock()
+		if err := d.DB.Create(dsmLog.SubMsgLogList).Error; err != nil {
+			zaplog.Error("handleSignal", zap.Error(err))
+		}
+		zaplog.Info("handleSignal", zap.Any("success", dsmLog.SubMsgLogList))
+		dsmLog.SubMsgLogList = make([]*model.SubMsgLog, 0)
+		dsmLog.Mx.Unlock()
+	}
+	if isCloseChan {
+		close(subMsgChan)
+		golangMqttClient.Disconnect(250)
+	}
 }