package mqtt import ( "encoding/json" "fmt" "kpt-pasture/model" "kpt-pasture/util" "math" "strconv" "strings" "sync" "time" "github.com/jinzhu/copier" pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "gitee.com/xuyiping_admin/pkg/xerr" "go.uber.org/zap" "gorm.io/gorm" ) type DataInsertNeckRingLog struct { NeckRingOriginalData []*model.NeckRingOriginal NeckRingErrorData []*model.NeckRingError NeckRingUnRegisterData []*model.NeckRingUnRegister Mx *sync.RWMutex } var ( batchSize = 10 batchList = make([]*model.NeckRingOriginal, 0, batchSize) DefaultLimit = int32(10000) DSMLog = &DataInsertNeckRingLog{ NeckRingOriginalData: make([]*model.NeckRingOriginal, 0), NeckRingErrorData: make([]*model.NeckRingError, 0), NeckRingUnRegisterData: make([]*model.NeckRingUnRegister, 0), Mx: &sync.RWMutex{}, } ) func (e *Entry) NeckRingHandle(data []byte) { newData := e.MsgDataFormat2(data) if newData == nil || len(newData) <= 0 { return } zaplog.Info("NeckRingHandle", zap.Any("data", newData), zap.Any("original", string(data)), zap.Any("time", time.Now().Unix())) batchList = append(batchList, newData...) if len(batchList) >= batchSize { e.processBatch(batchList) batchList = batchList[:0] // 清空 batchList } } // 处理批量数据 func (e *Entry) processBatch(batchList []*model.NeckRingOriginal) { // 初始化分类数据 var ( errorData []*model.NeckRingError originalData []*model.NeckRingOriginal ) // 分类数据 for _, batch := range batchList { // 异常脖环数据 if ok := util.IsValidFrameId(batch.Frameid); !ok { var ed model.NeckRingError copier.Copy(&ed, &batch) errorData = append(errorData, &ed) } else { originalData = append(originalData, batch) } } // 更新日志 DSMLog.Mx.Lock() defer DSMLog.Mx.Unlock() DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, errorData...) DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, originalData...) // 写入数据 if err := e.CreatedData(DSMLog); err != nil { zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", DSMLog)) } // 清空日志 DSMLog.NeckRingErrorData = DSMLog.NeckRingErrorData[:0] DSMLog.NeckRingOriginalData = DSMLog.NeckRingOriginalData[:0] } func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error { if err := e.DB.Transaction(func(tx *gorm.DB) error { if len(DSMLog.NeckRingErrorData) > 0 { if err := e.DB.Create(DSMLog.NeckRingErrorData).Error; err != nil { return xerr.WithStack(err) } } if len(DSMLog.NeckRingOriginalData) > 0 { if err := e.DB.Create(DSMLog.NeckRingOriginalData).Error; err != nil { return xerr.WithStack(err) } } return nil }); err != nil { return xerr.WithStack(err) } return nil } func (e *Entry) MsgDataFormat(msg []byte) []*model.NeckRingOriginal { msgData := make(map[string]interface{}) pairs := strings.Split(util.MsgFormat(string(msg)), " ") for _, pair := range pairs { parts := strings.SplitN(pair, ":", 2) if len(parts) != 2 { continue } key, value := parts[0], parts[1] if len(key) == 0 { continue } msgData[key] = value } softVer := int64(0) if softVerInter, ok := msgData["SOFT_VER"]; ok { if softVerstr, ok := softVerInter.(string); ok { softVer, _ = strconv.ParseInt(softVerstr, 10, 64) } } if softVer <= 0 { if softVerInter, ok := msgData["soft_ver"]; ok { if softVerstr, ok := softVerInter.(string); ok { softVer, _ = strconv.ParseInt(softVerstr, 10, 64) } } } uuid := "" if uuidInter, ok := msgData["uuid"]; ok { if uuidStr, ok := uuidInter.(string); ok { uuid = uuidStr } } frameId := int64(0) if frameIdInter, ok := msgData["frameid"]; ok { if frameId64, ok := frameIdInter.(string); ok { frameId, _ = strconv.ParseInt(frameId64, 10, 64) } } temp := float64(0) if tempInter, ok := msgData["Temp"]; ok { if tempFloat, ok := tempInter.(string); ok { temp, _ = strconv.ParseFloat(tempFloat, 64) } } if temp <= 0 { if tempInter, ok := msgData["temp"]; ok { if tempFloat, ok := tempInter.(string); ok { temp, _ = strconv.ParseFloat(tempFloat, 64) } } } imei := "" if imeiInter, ok := msgData["imei"]; ok { if imeiStr, ok := imeiInter.(string); ok { imei = imeiStr } } active := int64(0) if activeInter, ok := msgData["active"]; ok { if active32, ok := activeInter.(string); ok { active, _ = strconv.ParseInt(active32, 10, 64) } } inAction := int64(0) if inActionInter, ok := msgData["inactive"]; ok { if inAction32, ok := inActionInter.(string); ok { inAction, _ = strconv.ParseInt(inAction32, 10, 64) } } ruMina := int64(0) if ruMinaInter, ok := msgData["Rumina"]; ok { if ruMina32, ok := ruMinaInter.(string); ok { ruMina, _ = strconv.ParseInt(ruMina32, 10, 64) } } if ruMina <= 0 { if ruMinaInter, ok := msgData["rumina"]; ok { if ruMina32, ok := ruMinaInter.(string); ok { ruMina, _ = strconv.ParseInt(ruMina32, 10, 64) } } } intake := int64(0) if intakeInter, ok := msgData["Intake"]; ok { if intake32, ok := intakeInter.(string); ok { intake, _ = strconv.ParseInt(intake32, 10, 64) } } if intake <= 0 { if intakeInter, ok := msgData["intake"]; ok { if intake32, ok := intakeInter.(string); ok { intake, _ = strconv.ParseInt(intake32, 10, 64) } } } gasp := int64(0) if gaspInter, ok := msgData["gasp"]; ok { if gasp32, ok := gaspInter.(string); ok { gasp, _ = strconv.ParseInt(gasp32, 10, 64) } } reMain := int64(0) if reMainInter, ok := msgData["Remain"]; ok { if reMain32, ok := reMainInter.(string); ok { reMain, _ = strconv.ParseInt(reMain32, 10, 64) } } if ruMina <= 0 { if reMainInter, ok := msgData["remain"]; ok { if reMain32, ok := reMainInter.(string); ok { reMain, _ = strconv.ParseInt(reMain32, 10, 64) } } } /*cowId := "" if cowIdInter, ok := msgData["cowid"]; ok { if cowIdStr, ok := cowIdInter.(string); ok { cowId = cowIdStr } } csq := int64(0) if csqInter, ok := msgData["csq"]; ok { if csq32, ok := csqInter.(string); ok { csq, _ = strconv.ParseInt(csq32, 10, 64) } } other := int64(0) if otherInter, ok := msgData["other"]; ok { if other32, ok := otherInter.(string); ok { other, _ = strconv.ParseInt(other32, 10, 64) } } nccId := "" if nccIdInter, ok := msgData["nccid"]; ok { if nccIdStr, ok := nccIdInter.(string); ok { nccId = nccIdStr } } */ return []*model.NeckRingOriginal{ { FirmwareVersion: int32(softVer), Uuid: uuid, Frameid: int32(frameId), ReceiveNumber: imei, Active: int32(active), Inactive: int32(inAction), Rumina: int32(ruMina), Intake: int32(intake), Gasp: int32(gasp), Remain: int32(reMain), }, } } func (e *Entry) MsgDataFormat2(msg []byte) []*model.NeckRingOriginal { neckLogList := &NeckRingWrapper{} if err := json.Unmarshal(msg, neckLogList); err != nil { zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg))) } res := make([]*model.NeckRingOriginal, 0) for _, neckLog := range neckLogList.NeckRing.NeckPck { activeDate, hours := util.GetNeckRingActiveTimer(neckLog.Frameid) activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes if neckLog.Frameid%10 == 8 { activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours } res = append(res, &model.NeckRingOriginal{ Uuid: neckLog.UUID, NeckRingNumber: fmt.Sprintf("%d", neckLog.Ecowid), ActiveDate: activeDate, Hours: int32(hours), Frameid: neckLog.Frameid, Rumina: neckLog.Rumina, Intake: neckLog.Intake, Inactive: neckLog.Inactive, Gasp: neckLog.Other, High: neckLog.Activitys, Active: neckLog.High, FirmwareVersion: neckLog.Sver, HardwareVersion: neckLog.Hver, Remain: neckLog.Remain, Voltage: neckLog.BAT, RestartReason: neckLog.STATUS, Upper: neckLog.UpPer, Imei: neckLog.Imei, ReceiveNumber: neckLog.Imei, ActiveDateType: activeDateTimeType, IsShow: pasturePb.IsShow_No, }) } return res } // NeckRingOriginalMergeData 把脖环数据合并成2个小时的 func (e *Entry) NeckRingOriginalMergeData() { limit := e.Cfg.NeckRingLimit if limit <= 0 { limit = DefaultLimit } updateOriginalMaxId := e.GetSystemConfigure(model.MaxEstrus).Value neckRingList := make([]*model.NeckRingOriginal, 0) if err := e.DB.Model(new(model.NeckRingOriginal)). Where("id > ?", updateOriginalMaxId). Order("id asc").Limit(int(limit)). Find(&neckRingList).Error; err != nil { return } if len(neckRingList) <= 0 { return } originalMapData := make(map[string]*model.NeckRingOriginalMerge) // 合并成2个小时的 for _, v := range neckRingList { xframeId := int(math.Floor(float64(v.Frameid)/10) * 2) mapKey := fmt.Sprintf("%s%s%s%s%d", v.NeckRingNumber, model.JoinKey, v.ActiveDate, model.JoinKey, xframeId) // 0001/2023-12-04/0 0001/2023-12-03/4 if _, ok := originalMapData[mapKey]; !ok { originalMapData[mapKey] = new(model.NeckRingOriginalMerge) } v.IsAvgHours() originalMapData[mapKey].IsMageData(v) } // 算平均值 for _, v := range originalMapData { v.SumAvg() } // 更新脖环牛只相关信息 newNeckActiveHabitList := model.NeckRingOriginalMap(originalMapData).ForMatData() if err := e.DB.Transaction(func(tx *gorm.DB) error { // 更新已处理过的id if err := tx.Model(new(model.SystemConfigure)). Where("name = ?", model.UpdateOriginalMaxId). Update("value", neckRingList[len(neckRingList)-1].Id). Error; err != nil { return xerr.WithStack(err) } for _, neckActiveHabit := range newNeckActiveHabitList { // 新数据直接插入 todo 待优化 historyNeckActiveHabit, ct := e.IsExistNeckActiveHabit(neckActiveHabit.NeckRingNumber, neckActiveHabit.HeatDate, neckActiveHabit.Frameid) if ct <= 0 { if err := tx.Create(neckActiveHabit).Error; err != nil { return xerr.WithStack(err) } continue } if historyNeckActiveHabit == nil { zaplog.Error("NeckRingOriginalMergeData", zap.Any("historyNeckActiveHabit", historyNeckActiveHabit)) continue } // 更新数据 historyNeckActiveHabit.MergeData(neckActiveHabit) if err := tx.Model(new(model.NeckActiveHabit)). Select("rumina", "intake", "inactive", "gasp", "other", "high", "active"). Where("id = ?", historyNeckActiveHabit.Id). Updates(historyNeckActiveHabit).Error; err != nil { return xerr.WithStack(err) } } return nil }); err != nil { zaplog.Error("NeckRingOriginalMergeData", zap.Any("transaction", err)) return } }