|
@@ -3,7 +3,14 @@ package mqtt
|
|
|
import (
|
|
|
"fmt"
|
|
|
"kpt-pasture/config"
|
|
|
+ "kpt-pasture/model"
|
|
|
+ "kpt-pasture/util"
|
|
|
+ "strconv"
|
|
|
+ "strings"
|
|
|
"sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
|
|
|
|
|
|
"gitee.com/xuyiping_admin/pkg/logger/zaplog"
|
|
|
golangMqtt "github.com/eclipse/paho.mqtt.golang"
|
|
@@ -59,16 +66,194 @@ func (d *DataEventEntry) SubMsg(conf config.MqttSetting, client golangMqtt.Clien
|
|
|
}
|
|
|
|
|
|
defer close(subMsgChan)
|
|
|
- for {
|
|
|
- select {
|
|
|
- case msg := <-subMsgChan:
|
|
|
- bufferPool.Put(msg)
|
|
|
- d.ProcessMessages(msg)
|
|
|
+ select {
|
|
|
+ case msg := <-subMsgChan:
|
|
|
+ bufferPool.Put(msg)
|
|
|
+ d.ProcessMessages(msg)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (d *DataEventEntry) ProcessMessages(msg []byte) {
|
|
|
+ neckRingOriginalData, err := d.MsgDataFormat(msg)
|
|
|
+ if err != nil {
|
|
|
+ zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if neckRingOriginalData == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if neckRingOriginalData.Imei == "" {
|
|
|
+ zaplog.Info("neckRingOriginalData", zap.Any("msg", string(msg)), zap.Any("neckRingOriginalData", neckRingOriginalData))
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ defer func() {
|
|
|
+ if time.Now().Day()%15 == 0 {
|
|
|
+ d.DB.Model(new(model.NeckRingOriginalData)).
|
|
|
+ Where("created_at < ?", time.Now().AddDate(-2, 0, 0).Unix()).
|
|
|
+ Delete(new(model.NeckRingOriginalData))
|
|
|
+ return
|
|
|
}
|
|
|
+ }()
|
|
|
+
|
|
|
+ // 计算牛只实际活动时间
|
|
|
+ nowDayTime := time.Now()
|
|
|
+ currHour := nowDayTime.Hour() + 2
|
|
|
+ frameIdHour := neckRingOriginalData.FrameId * 2
|
|
|
+ frameDayTime := fmt.Sprintf("%s %s:00:00", nowDayTime.Format(model.LayoutDate2), fmt.Sprintf("%02d", frameIdHour))
|
|
|
+ if frameIdHour > int64(currHour) {
|
|
|
+ frameDayTime = fmt.Sprintf("%s %s:00:00", nowDayTime.AddDate(0, 0, -1).Format(model.LayoutDate2), fmt.Sprintf("%02d", frameIdHour))
|
|
|
+ }
|
|
|
+
|
|
|
+ neckRingOriginalData.ActiveTime = frameDayTime
|
|
|
+ if err = d.DB.Create(neckRingOriginalData).Error; err != nil {
|
|
|
+ zaplog.Error("ProcessMessages", zap.Any("err", err), zap.Any("neckRingOriginalData", neckRingOriginalData))
|
|
|
}
|
|
|
|
|
|
+ // 更新脖环数据状态
|
|
|
+ neckRingStatus := pasturePb.NeckRingStatus_Normal
|
|
|
+ errorReason := ""
|
|
|
+ if neckRingOriginalData.FrameId >= 11 || neckRingOriginalData.FrameId < 0 {
|
|
|
+ neckRingStatus = pasturePb.NeckRingStatus_Error
|
|
|
+ errorReason = "数据异常"
|
|
|
+ }
|
|
|
+
|
|
|
+ d.DB.Model(new(model.NeckRingLog)).
|
|
|
+ Where("number = ?", neckRingOriginalData.Imei).
|
|
|
+ Updates(map[string]interface{}{
|
|
|
+ "status": neckRingStatus,
|
|
|
+ "error_reason": errorReason,
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
-func (d *DataEventEntry) ProcessMessages(msg []byte) {
|
|
|
- fmt.Println("===byte==", string(msg))
|
|
|
+func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.NeckRingOriginalData, error) {
|
|
|
+ msgData := make(map[string]interface{})
|
|
|
+ pairs := strings.Split(util.MsgFormat(string(msg)), " ")
|
|
|
+ for _, pair := range pairs {
|
|
|
+ parts := strings.SplitN(pair, ":", 2)
|
|
|
+ if len(parts) != 2 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ key, value := parts[0], parts[1]
|
|
|
+ if len(key) == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ msgData[key] = value
|
|
|
+ }
|
|
|
+
|
|
|
+ softVer := int64(0)
|
|
|
+ if softVerInter, ok := msgData["SOFT_VER"]; ok {
|
|
|
+ if softVerstr, ok := softVerInter.(string); ok {
|
|
|
+ softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ uuid := ""
|
|
|
+ if uuidInter, ok := msgData["uuid"]; ok {
|
|
|
+ if uuidStr, ok := uuidInter.(string); ok {
|
|
|
+ uuid = uuidStr
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ frameId := int64(0)
|
|
|
+ if frameIdInter, ok := msgData["frameid"]; ok {
|
|
|
+ if frameId64, ok := frameIdInter.(string); ok {
|
|
|
+ frameId, _ = strconv.ParseInt(frameId64, 10, 64)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ cowId := ""
|
|
|
+ if cowIdInter, ok := msgData["cowid"]; ok {
|
|
|
+ if cowIdStr, ok := cowIdInter.(string); ok {
|
|
|
+ cowId = cowIdStr
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ csq := int64(0)
|
|
|
+ if csqInter, ok := msgData["csq"]; ok {
|
|
|
+ if csq32, ok := csqInter.(string); ok {
|
|
|
+ csq, _ = strconv.ParseInt(csq32, 10, 64)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ temp := float64(0)
|
|
|
+ if tempInter, ok := msgData["Temp"]; ok {
|
|
|
+ if tempFloat, ok := tempInter.(string); ok {
|
|
|
+ temp, _ = strconv.ParseFloat(tempFloat, 64)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ imei := ""
|
|
|
+ if imeiInter, ok := msgData["imei"]; ok {
|
|
|
+ if imeiStr, ok := imeiInter.(string); ok {
|
|
|
+ imei = imeiStr
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ active := int64(0)
|
|
|
+ if activeInter, ok := msgData["active"]; ok {
|
|
|
+ if active32, ok := activeInter.(string); ok {
|
|
|
+ active, _ = strconv.ParseInt(active32, 10, 64)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ inAction := int64(0)
|
|
|
+ if inActionInter, ok := msgData["inactive"]; ok {
|
|
|
+ if inAction32, ok := inActionInter.(string); ok {
|
|
|
+ inAction, _ = strconv.ParseInt(inAction32, 10, 64)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ ruMina := int64(0)
|
|
|
+ if ruMinaInter, ok := msgData["Rumina"]; ok {
|
|
|
+ if ruMina32, ok := ruMinaInter.(string); ok {
|
|
|
+ ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ intake := int64(0)
|
|
|
+ if intakeInter, ok := msgData["Intake"]; ok {
|
|
|
+ if intake32, ok := intakeInter.(string); ok {
|
|
|
+ intake, _ = strconv.ParseInt(intake32, 10, 64)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ gasp := int64(0)
|
|
|
+ if gaspInter, ok := msgData["gasp"]; ok {
|
|
|
+ if gasp32, ok := gaspInter.(string); ok {
|
|
|
+ gasp, _ = strconv.ParseInt(gasp32, 10, 64)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ other := int64(0)
|
|
|
+ if otherInter, ok := msgData["other"]; ok {
|
|
|
+ if other32, ok := otherInter.(string); ok {
|
|
|
+ other, _ = strconv.ParseInt(other32, 10, 64)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ reMain := int64(0)
|
|
|
+ if reMainInter, ok := msgData["Remain"]; ok {
|
|
|
+ if reMain32, ok := reMainInter.(string); ok {
|
|
|
+ reMain, _ = strconv.ParseInt(reMain32, 10, 64)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return &model.NeckRingOriginalData{
|
|
|
+ SoftVer: softVer,
|
|
|
+ Uuid: uuid,
|
|
|
+ FrameId: frameId,
|
|
|
+ CowId: cowId,
|
|
|
+ Csq: csq,
|
|
|
+ Temp: int64(temp * 100),
|
|
|
+ Imei: imei,
|
|
|
+ Active: int32(active),
|
|
|
+ InActive: int32(inAction),
|
|
|
+ RuMina: int32(ruMina),
|
|
|
+ Intake: int32(intake),
|
|
|
+ Gasp: int32(gasp),
|
|
|
+ Other: int32(other),
|
|
|
+ ReMain: int32(reMain),
|
|
|
+ }, nil
|
|
|
+
|
|
|
}
|