package mqtt import ( "encoding/json" "kpt-pasture/model" "kpt-pasture/util" "strconv" "strings" "sync" "time" "github.com/jinzhu/copier" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "gitee.com/xuyiping_admin/pkg/xerr" "go.uber.org/zap" "gorm.io/gorm" ) type DataInsertNeckRingLog struct { NeckRingOriginalData []*model.NeckRingOriginal NeckRingErrorData []*model.NeckRingError } var ( DSMLog = &DataInsertNeckRingLog{ NeckRingOriginalData: make([]*model.NeckRingOriginal, 0), NeckRingErrorData: make([]*model.NeckRingError, 0), } pastureMqttMap = make(map[string]int64) isFindPastureMqttMap bool mu sync.Mutex ) func (e *Entry) NeckRingHandle(data []byte) { newData := e.MsgDataFormat2(data) if newData == nil { return } if len(newData.NeckRingErrorData) > 0 || len(newData.NeckRingOriginalData) > 0 { // 写入数据 if err := e.CreatedData(newData); err != nil { zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", newData)) } } } func (e *Entry) FindPastureMqttMap() map[string]int64 { if isFindPastureMqttMap { return pastureMqttMap } appMqttList := make([]*model.NeckRing, 0) if err := e.DB.Model(new(model.AppMqtt)).Find(&appMqttList).Error; err != nil { zaplog.Error("FindPastureMqttMap", zap.Any("err", err)) } for _, v := range appMqttList { pastureMqttMap[v.NeckRingNumber] = v.PastureId } isFindPastureMqttMap = true return pastureMqttMap } // 处理批量数据 func (e *Entry) processBatch(batchList []*model.NeckRingOriginal) { // 初始化分类数据 var ( errorData []*model.NeckRingError originalData []*model.NeckRingOriginal ) // 分类数据 for _, batch := range batchList { // 异常脖环数据 if ok := util.IsValidFrameId(batch.Frameid); !ok { var ed model.NeckRingError err := copier.Copy(&ed, &batch) if err != nil { zaplog.Error("processBatch", zap.Any("copier", err), zap.Any("data", batch)) continue } errorData = append(errorData, &ed) } else { originalData = append(originalData, batch) } } // 更新日志 DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, errorData...) DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, originalData...) // 写入数据 if err := e.CreatedData(DSMLog); err != nil { zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", DSMLog)) } // 清空日志 DSMLog.NeckRingErrorData = DSMLog.NeckRingErrorData[:0] DSMLog.NeckRingOriginalData = DSMLog.NeckRingOriginalData[:0] } func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error { if err := e.DB.Transaction(func(tx *gorm.DB) error { if len(DSMLog.NeckRingErrorData) > 0 { if err := e.DB.Model(new(model.NeckRingError)).Create(DSMLog.NeckRingErrorData).Error; err != nil { return xerr.WithStack(err) } } if len(DSMLog.NeckRingOriginalData) > 0 { // 批量写入数据 if len(DSMLog.NeckRingOriginalData) > 50 { if err := e.DB.Model(new(model.NeckRingOriginal)).Create(DSMLog.NeckRingOriginalData[:50]).Error; err != nil { return xerr.WithStack(err) } time.Sleep(100 * time.Millisecond) if err := e.DB.Model(new(model.NeckRingOriginal)).Create(DSMLog.NeckRingOriginalData[50:]).Error; err != nil { return xerr.WithStack(err) } } else { if err := e.DB.Model(new(model.NeckRingOriginal)).Create(DSMLog.NeckRingOriginalData).Error; err != nil { return xerr.WithStack(err) } } } return nil }); err != nil { return xerr.WithStack(err) } return nil } func (e *Entry) MsgDataFormat2(msg []byte) *DataInsertNeckRingLog { mu.Lock() defer mu.Unlock() neckLogList := &model.NeckRingWrapper{} if err := json.Unmarshal(msg, neckLogList); err != nil { zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg))) return nil } if neckLogList.Type == "heartbeat" { return nil } normalOriginal := make([]*model.NeckRingOriginal, 0) errorOriginal := make([]*model.NeckRingError, 0) pastureMqttMap = e.FindPastureMqttMap() for _, neckLog := range neckLogList.NeckRing.NeckPck { newOriginal := model.NewNeckRingOriginal(neckLog) if ok := util.IsValidFrameId(neckLog.Frameid); !ok { var ed model.NeckRingError if err := copier.Copy(&ed, &newOriginal); err != nil { zaplog.Error("MsgDataFormat2", zap.Any("copier", err), zap.Any("neckLog", neckLog)) continue } errorOriginal = append(errorOriginal, &ed) } else { activeDate, hours := util.GetNeckRingActiveTimer(neckLog.Frameid) newOriginal.ActiveDate = activeDate newOriginal.Hours = int32(hours) normalOriginal = append(normalOriginal, newOriginal) } } return &DataInsertNeckRingLog{ NeckRingErrorData: errorOriginal, NeckRingOriginalData: normalOriginal, } } func (e *Entry) MsgDataFormat(msg []byte) []*model.NeckRingOriginal { msgData := make(map[string]interface{}) pairs := strings.Split(util.MsgFormat(string(msg)), " ") for _, pair := range pairs { parts := strings.SplitN(pair, ":", 2) if len(parts) != 2 { continue } key, value := parts[0], parts[1] if len(key) == 0 { continue } msgData[key] = value } softVer := int64(0) if softVerInter, ok := msgData["SOFT_VER"]; ok { if softVerstr, ok := softVerInter.(string); ok { softVer, _ = strconv.ParseInt(softVerstr, 10, 64) } } if softVer <= 0 { if softVerInter, ok := msgData["soft_ver"]; ok { if softVerstr, ok := softVerInter.(string); ok { softVer, _ = strconv.ParseInt(softVerstr, 10, 64) } } } uuid := "" if uuidInter, ok := msgData["uuid"]; ok { if uuidStr, ok := uuidInter.(string); ok { uuid = uuidStr } } frameId := int64(0) if frameIdInter, ok := msgData["frameid"]; ok { if frameId64, ok := frameIdInter.(string); ok { frameId, _ = strconv.ParseInt(frameId64, 10, 64) } } temp := float64(0) if tempInter, ok := msgData["Temp"]; ok { if tempFloat, ok := tempInter.(string); ok { temp, _ = strconv.ParseFloat(tempFloat, 64) } } if temp <= 0 { if tempInter, ok := msgData["temp"]; ok { if tempFloat, ok := tempInter.(string); ok { temp, _ = strconv.ParseFloat(tempFloat, 64) } } } imei := "" if imeiInter, ok := msgData["imei"]; ok { if imeiStr, ok := imeiInter.(string); ok { imei = imeiStr } } active := int64(0) if activeInter, ok := msgData["active"]; ok { if active32, ok := activeInter.(string); ok { active, _ = strconv.ParseInt(active32, 10, 64) } } inAction := int64(0) if inActionInter, ok := msgData["inactive"]; ok { if inAction32, ok := inActionInter.(string); ok { inAction, _ = strconv.ParseInt(inAction32, 10, 64) } } ruMina := int64(0) if ruMinaInter, ok := msgData["Rumina"]; ok { if ruMina32, ok := ruMinaInter.(string); ok { ruMina, _ = strconv.ParseInt(ruMina32, 10, 64) } } if ruMina <= 0 { if ruMinaInter, ok := msgData["rumina"]; ok { if ruMina32, ok := ruMinaInter.(string); ok { ruMina, _ = strconv.ParseInt(ruMina32, 10, 64) } } } intake := int64(0) if intakeInter, ok := msgData["Intake"]; ok { if intake32, ok := intakeInter.(string); ok { intake, _ = strconv.ParseInt(intake32, 10, 64) } } if intake <= 0 { if intakeInter, ok := msgData["intake"]; ok { if intake32, ok := intakeInter.(string); ok { intake, _ = strconv.ParseInt(intake32, 10, 64) } } } gasp := int64(0) if gaspInter, ok := msgData["gasp"]; ok { if gasp32, ok := gaspInter.(string); ok { gasp, _ = strconv.ParseInt(gasp32, 10, 64) } } reMain := int64(0) if reMainInter, ok := msgData["Remain"]; ok { if reMain32, ok := reMainInter.(string); ok { reMain, _ = strconv.ParseInt(reMain32, 10, 64) } } if ruMina <= 0 { if reMainInter, ok := msgData["remain"]; ok { if reMain32, ok := reMainInter.(string); ok { reMain, _ = strconv.ParseInt(reMain32, 10, 64) } } } /*cowId := "" if cowIdInter, ok := msgData["cowid"]; ok { if cowIdStr, ok := cowIdInter.(string); ok { cowId = cowIdStr } } csq := int64(0) if csqInter, ok := msgData["csq"]; ok { if csq32, ok := csqInter.(string); ok { csq, _ = strconv.ParseInt(csq32, 10, 64) } } other := int64(0) if otherInter, ok := msgData["other"]; ok { if other32, ok := otherInter.(string); ok { other, _ = strconv.ParseInt(other32, 10, 64) } } nccId := "" if nccIdInter, ok := msgData["nccid"]; ok { if nccIdStr, ok := nccIdInter.(string); ok { nccId = nccIdStr } } */ return []*model.NeckRingOriginal{ { FirmwareVersion: int32(softVer), Uuid: uuid, Frameid: int32(frameId), ReceiveNumber: imei, Active: int32(active), Inactive: int32(inAction), Rumina: int32(ruMina), Intake: int32(intake), Gasp: int32(gasp), Remain: int32(reMain), }, } }