package mqtt import ( "encoding/json" "fmt" "kpt-pasture/model" "kpt-pasture/util" "strconv" "strings" "time" pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow" "github.com/jinzhu/copier" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "gitee.com/xuyiping_admin/pkg/xerr" "go.uber.org/zap" "gorm.io/gorm" ) type DataInsertNeckRingLog struct { NeckRingOriginalData []*model.NeckRingOriginal NeckRingErrorData []*model.NeckRingError } var ( batchSize = 10 batchList = make([]*model.NeckRingOriginal, 0, batchSize) defaultLimit = int32(1000) DSMLog = &DataInsertNeckRingLog{ NeckRingOriginalData: make([]*model.NeckRingOriginal, 0), NeckRingErrorData: make([]*model.NeckRingError, 0), } isDelete bool pastureMqttMap = make(map[string]int64) isFindPastureMqttMap bool ) func (e *Entry) NeckRingHandle(data []byte) { newData := e.MsgDataFormat2(data) if newData == nil { return } if len(newData.NeckRingErrorData) > 0 || len(newData.NeckRingOriginalData) > 0 { // 写入数据 if err := e.CreatedData(newData); err != nil { zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", newData)) } } return } // NeckRingOriginalMergeData 把脖环数据合并成2个小时的 func (e *Entry) NeckRingOriginalMergeData() { var err error limit := e.Cfg.NeckRingLimit if limit <= 0 { limit = defaultLimit } mergeDataMaxId := e.GetSystemConfigure(model.UpdateOriginalMax).Value newTime := time.Now() createdAt := newTime.Add(-1 * time.Hour) neckRingList := make([]*model.NeckRingOriginal, 0) if err = e.DB.Model(new(model.NeckRingOriginal)). Where("is_show <= ?", pasturePb.IsShow_No). Where("created_at <= ?", createdAt.Unix()). Order("neck_ring_number,active_date,frameid"). Limit(int(limit)).Find(&neckRingList).Error; err != nil { return } if len(neckRingList) <= 0 { return } if neckRingList[len(neckRingList)-1].Id <= mergeDataMaxId { return } defer func() { newMergeDataMaxId := neckRingList[len(neckRingList)-1].Id if newMergeDataMaxId > 0 && newMergeDataMaxId > mergeDataMaxId { e.DB.Model(new(model.SystemConfigure)). Where("name = ?", model.UpdateOriginalMax). Update("value", newMergeDataMaxId) e.DB.Model(new(model.NeckRingOriginal)). Where("id BETWEEN ? AND ?", mergeDataMaxId, newMergeDataMaxId). Update("is_show", pasturePb.IsShow_Ok) } if newTime.Day()%15 == 0 && !isDelete { // 原始数据删除15天前的 e.DB.Model(new(model.NeckRingOriginal)). Where("created_at < ?", newTime.AddDate(0, 0, -15).Unix()). Delete(new(model.NeckRingOriginal)) // 活动数据删除6个月前的数据 e.DB.Model(new(model.NeckActiveHabit)). Where("created_at < ?", newTime.AddDate(0, -6, 0).Unix()). Delete(new(model.NeckActiveHabit)) isDelete = true } }() // 计算合并 neckActiveHabitList := e.recalculate(neckRingList) if len(neckActiveHabitList) <= 0 { return } if err = e.DB.Transaction(func(tx *gorm.DB) error { for _, neckActiveHabit := range neckActiveHabitList { //更新脖环牛只相关信息 新数据直接插入 historyNeckActiveHabit, ct := e.IsExistNeckActiveHabit(neckActiveHabit.NeckRingNumber, neckActiveHabit.HeatDate, neckActiveHabit.Frameid) if ct <= 0 { // 过滤牛只未绑定的脖环的数据 cowInfo := e.GetCowInfoByNeckRingNumber(neckActiveHabit.PastureId, neckActiveHabit.NeckRingNumber) if cowInfo == nil || cowInfo.Id <= 0 { continue } neckActiveHabit.CowId = cowInfo.Id neckActiveHabit.Lact = cowInfo.Lact neckActiveHabit.CalvingAge = int32(cowInfo.CalvingAge) if err = tx.Create(neckActiveHabit).Error; err != nil { return xerr.WithStack(err) } newNeckRingProcess := model.NewNeckRingProcess(neckActiveHabit) if err = tx.Create(newNeckRingProcess).Error; err != nil { return xerr.WithStack(err) } if err = e.UpdateNeckRingOriginalIsShow(neckActiveHabit); err != nil { zaplog.Error("UpdateNeckRingOriginalIsShow", zap.Any("err", err), zap.Any("neckActiveHabit", neckActiveHabit)) } continue } if historyNeckActiveHabit == nil || historyNeckActiveHabit.Id <= 0 { zaplog.Error("NeckRingOriginalMergeData", zap.Any("historyNeckActiveHabit", historyNeckActiveHabit), zap.Any("ct", ct), zap.Any("neckActiveHabit", neckActiveHabit)) continue } // 重新计算 newNeckActiveHabit := e.againRecalculate(historyNeckActiveHabit) if newNeckActiveHabit != nil { if err = tx.Model(new(model.NeckActiveHabit)). Select("rumina", "intake", "inactive", "gasp", "other", "high", "active"). Where("id = ?", historyNeckActiveHabit.Id). Updates(newNeckActiveHabit).Error; err != nil { return xerr.WithStack(err) } } } return nil }); err != nil { zaplog.Error("NeckRingOriginalMergeData", zap.Any("transaction", err)) return } } func (e *Entry) FindPastureMqttMap() map[string]int64 { if isFindPastureMqttMap { return pastureMqttMap } appMqttList := make([]*model.AppMqtt, 0) if err := e.DB.Model(new(model.AppMqtt)).Find(&appMqttList).Error; err != nil { zaplog.Error("FindPastureMqttMap", zap.Any("err", err)) } for _, v := range appMqttList { pastureMqttMap[v.ReceiveNumber] = v.PastureId } isFindPastureMqttMap = true return pastureMqttMap } // 处理批量数据 func (e *Entry) processBatch(batchList []*model.NeckRingOriginal) { // 初始化分类数据 var ( errorData []*model.NeckRingError originalData []*model.NeckRingOriginal ) // 分类数据 for _, batch := range batchList { // 异常脖环数据 if ok := util.IsValidFrameId(batch.Frameid); !ok { var ed model.NeckRingError err := copier.Copy(&ed, &batch) if err != nil { zaplog.Error("processBatch", zap.Any("copier", err), zap.Any("data", batch)) continue } errorData = append(errorData, &ed) } else { originalData = append(originalData, batch) } } // 更新日志 DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, errorData...) DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, originalData...) // 写入数据 if err := e.CreatedData(DSMLog); err != nil { zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", DSMLog)) } // 清空日志 DSMLog.NeckRingErrorData = DSMLog.NeckRingErrorData[:0] DSMLog.NeckRingOriginalData = DSMLog.NeckRingOriginalData[:0] } func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error { if err := e.DB.Transaction(func(tx *gorm.DB) error { if len(DSMLog.NeckRingErrorData) > 0 { if err := e.DB.Model(new(model.NeckRingError)).Create(DSMLog.NeckRingErrorData).Error; err != nil { return xerr.WithStack(err) } } if len(DSMLog.NeckRingOriginalData) > 0 { if err := e.DB.Model(new(model.NeckRingOriginal)).Create(DSMLog.NeckRingOriginalData).Error; err != nil { return xerr.WithStack(err) } } return nil }); err != nil { return xerr.WithStack(err) } return nil } func (e *Entry) MsgDataFormat2(msg []byte) *DataInsertNeckRingLog { neckLogList := &model.NeckRingWrapper{} if err := json.Unmarshal(msg, neckLogList); err != nil { zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg))) return nil } if neckLogList.Type == "heartbeat" { return nil } normalOriginal := make([]*model.NeckRingOriginal, 0) errorOriginal := make([]*model.NeckRingError, 0) pastureMqttMap = e.FindPastureMqttMap() for _, neckLog := range neckLogList.NeckRing.NeckPck { newOriginal := model.NewNeckRingOriginal(neckLog, pastureMqttMap) if ok := util.IsValidFrameId(neckLog.Frameid); !ok { var ed model.NeckRingError if err := copier.Copy(&ed, &newOriginal); err != nil { zaplog.Error("MsgDataFormat2", zap.Any("copier", err), zap.Any("neckLog", neckLog)) continue } errorOriginal = append(errorOriginal, &ed) } else { activeDate, hours := util.GetNeckRingActiveTimer(neckLog.Frameid) newOriginal.ActiveDate = activeDate newOriginal.Hours = int32(hours) normalOriginal = append(normalOriginal, newOriginal) } } return &DataInsertNeckRingLog{ NeckRingErrorData: errorOriginal, NeckRingOriginalData: normalOriginal, } } // recalculate 合并计算 func (e *Entry) recalculate(neckRingList []*model.NeckRingOriginal) []*model.NeckActiveHabit { originalMapData := make(map[string]*model.NeckRingOriginalMerge) // 合并成2个小时的 for _, v := range neckRingList { xframeId := util.XFrameId(v.Frameid) mapKey := fmt.Sprintf("%s%s%s%s%d", v.NeckRingNumber, model.JoinKey, v.ActiveDate, model.JoinKey, xframeId) // 0001/2023-12-04/0 0001/2023-12-03/4 if originalMapData[mapKey] == nil { originalMapData[mapKey] = new(model.NeckRingOriginalMerge) } originalMapData[mapKey].IsMageData(v, xframeId) } // 算平均值 for k, v := range originalMapData { // 过滤掉合并后不满6条数据 if v.RecordCount != 6 { maxFrameId := e.CurrentMaxXFrameId(v.NeckRingNumber, v.ActiveDate) currXframeId := util.XFrameId(maxFrameId) if currXframeId-v.XframeId <= 1 { delete(originalMapData, k) } } v.SumAvg() } return model.NeckRingOriginalMap(originalMapData).ForMatData() } func (e *Entry) againRecalculate(data *model.NeckActiveHabit) *model.NeckActiveHabit { originalList := make([]*model.NeckRingOriginal, 0) frameIds := util.FrameIds(data.Frameid) if err := e.DB.Model(new(model.NeckRingOriginal)). Where("neck_ring_number = ?", data.NeckRingNumber). Where("active_date = ?", data.HeatDate). Where("frameid IN (?)", frameIds). Find(&originalList).Error; err != nil { return nil } newDataList := e.recalculate(originalList) if len(newDataList) != 1 { return nil } return newDataList[0] } // CurrentMaxXFrameId 当前最大frameid func (e *Entry) CurrentMaxXFrameId(neckRingNumber, activeDate string) (frameid int32) { type Fm struct { Frameid int32 } maxFx := &Fm{} if err := e.DB.Model(new(model.NeckRingOriginal)). Select("frameid"). Where("neck_ring_number = ?", neckRingNumber). Where("active_date = ?", activeDate). Order("frameid DESC").First(maxFx).Error; err != nil { zaplog.Error("CurrentMaxXFrameId", zap.Any("err", err)) return 0 } return maxFx.Frameid } func (e *Entry) UpdateNeckRingOriginalIsShow(neckRingList *model.NeckActiveHabit) error { if err := e.DB.Model(new(model.NeckRingOriginal)). Where("neck_ring_number = ?", neckRingList.NeckRingNumber). Where("active_date = ?", neckRingList.HeatDate). Where("frameid IN (?)", util.FrameIds(neckRingList.Frameid)). Update("is_show", pasturePb.IsShow_Ok).Error; err != nil { return xerr.WithStack(err) } return nil } func (e *Entry) MsgDataFormat(msg []byte) []*model.NeckRingOriginal { msgData := make(map[string]interface{}) pairs := strings.Split(util.MsgFormat(string(msg)), " ") for _, pair := range pairs { parts := strings.SplitN(pair, ":", 2) if len(parts) != 2 { continue } key, value := parts[0], parts[1] if len(key) == 0 { continue } msgData[key] = value } softVer := int64(0) if softVerInter, ok := msgData["SOFT_VER"]; ok { if softVerstr, ok := softVerInter.(string); ok { softVer, _ = strconv.ParseInt(softVerstr, 10, 64) } } if softVer <= 0 { if softVerInter, ok := msgData["soft_ver"]; ok { if softVerstr, ok := softVerInter.(string); ok { softVer, _ = strconv.ParseInt(softVerstr, 10, 64) } } } uuid := "" if uuidInter, ok := msgData["uuid"]; ok { if uuidStr, ok := uuidInter.(string); ok { uuid = uuidStr } } frameId := int64(0) if frameIdInter, ok := msgData["frameid"]; ok { if frameId64, ok := frameIdInter.(string); ok { frameId, _ = strconv.ParseInt(frameId64, 10, 64) } } temp := float64(0) if tempInter, ok := msgData["Temp"]; ok { if tempFloat, ok := tempInter.(string); ok { temp, _ = strconv.ParseFloat(tempFloat, 64) } } if temp <= 0 { if tempInter, ok := msgData["temp"]; ok { if tempFloat, ok := tempInter.(string); ok { temp, _ = strconv.ParseFloat(tempFloat, 64) } } } imei := "" if imeiInter, ok := msgData["imei"]; ok { if imeiStr, ok := imeiInter.(string); ok { imei = imeiStr } } active := int64(0) if activeInter, ok := msgData["active"]; ok { if active32, ok := activeInter.(string); ok { active, _ = strconv.ParseInt(active32, 10, 64) } } inAction := int64(0) if inActionInter, ok := msgData["inactive"]; ok { if inAction32, ok := inActionInter.(string); ok { inAction, _ = strconv.ParseInt(inAction32, 10, 64) } } ruMina := int64(0) if ruMinaInter, ok := msgData["Rumina"]; ok { if ruMina32, ok := ruMinaInter.(string); ok { ruMina, _ = strconv.ParseInt(ruMina32, 10, 64) } } if ruMina <= 0 { if ruMinaInter, ok := msgData["rumina"]; ok { if ruMina32, ok := ruMinaInter.(string); ok { ruMina, _ = strconv.ParseInt(ruMina32, 10, 64) } } } intake := int64(0) if intakeInter, ok := msgData["Intake"]; ok { if intake32, ok := intakeInter.(string); ok { intake, _ = strconv.ParseInt(intake32, 10, 64) } } if intake <= 0 { if intakeInter, ok := msgData["intake"]; ok { if intake32, ok := intakeInter.(string); ok { intake, _ = strconv.ParseInt(intake32, 10, 64) } } } gasp := int64(0) if gaspInter, ok := msgData["gasp"]; ok { if gasp32, ok := gaspInter.(string); ok { gasp, _ = strconv.ParseInt(gasp32, 10, 64) } } reMain := int64(0) if reMainInter, ok := msgData["Remain"]; ok { if reMain32, ok := reMainInter.(string); ok { reMain, _ = strconv.ParseInt(reMain32, 10, 64) } } if ruMina <= 0 { if reMainInter, ok := msgData["remain"]; ok { if reMain32, ok := reMainInter.(string); ok { reMain, _ = strconv.ParseInt(reMain32, 10, 64) } } } /*cowId := "" if cowIdInter, ok := msgData["cowid"]; ok { if cowIdStr, ok := cowIdInter.(string); ok { cowId = cowIdStr } } csq := int64(0) if csqInter, ok := msgData["csq"]; ok { if csq32, ok := csqInter.(string); ok { csq, _ = strconv.ParseInt(csq32, 10, 64) } } other := int64(0) if otherInter, ok := msgData["other"]; ok { if other32, ok := otherInter.(string); ok { other, _ = strconv.ParseInt(other32, 10, 64) } } nccId := "" if nccIdInter, ok := msgData["nccid"]; ok { if nccIdStr, ok := nccIdInter.(string); ok { nccId = nccIdStr } } */ return []*model.NeckRingOriginal{ { FirmwareVersion: int32(softVer), Uuid: uuid, Frameid: int32(frameId), ReceiveNumber: imei, Active: int32(active), Inactive: int32(inAction), Rumina: int32(ruMina), Intake: int32(intake), Gasp: int32(gasp), Remain: int32(reMain), }, } }