package crontab import ( "fmt" "kpt-pasture/model" "kpt-pasture/util" "math" "sort" "strings" "time" pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "gitee.com/xuyiping_admin/pkg/xerr" "go.uber.org/zap" ) const ( MinChangeFilter = -99 MinRuminaFilter = -99 MinChewFilter = -99 MinChangeHigh = -99 DefaultNb = 30 DefaultScore = 100 ) var ( defaultLimit = int32(1000) calculateIsRunning bool ) // NeckRingOriginalMerge 把脖环数据合并成2个小时的 func (e *Entry) NeckRingOriginalMerge() (err error) { if ok := e.IsExistCrontabLog(NeckRingOriginal); !ok { newTime := time.Now() e.CreateCrontabLog(NeckRingOriginal) // 原始数据删除15天前的 e.DB.Model(new(model.NeckRingOriginal)). Where("created_at < ?", newTime.AddDate(0, 0, -7).Unix()). Delete(new(model.NeckRingOriginal)) // 活动数据删除6个月前的数据 e.DB.Model(new(model.NeckActiveHabit)). Where("created_at < ?", newTime.AddDate(0, -2, 0).Unix()). Delete(new(model.NeckActiveHabit)) } pastureList := e.FindPastureList() if pastureList == nil || len(pastureList) == 0 { return nil } for _, pasture := range pastureList { if err = e.OriginalMergeData(pasture.Id); err != nil { zaplog.Error("NeckRingOriginalMerge", zap.Any("OriginalMergeData", err), zap.Any("pasture", pasture)) } } return nil } func (e *Entry) OriginalMergeData(pastureId int64) error { limit := e.Cfg.NeckRingLimit if limit <= 0 { limit = defaultLimit } neckRingList := make([]*model.NeckRingOriginal, 0) if err := e.DB.Model(new(model.NeckRingOriginal)). Where("is_show = ?", pasturePb.IsShow_No). Where("pasture_id = ?", pastureId). Limit(int(limit)). Find(&neckRingList).Error; err != nil { return xerr.WithStack(err) } if len(neckRingList) <= 0 { return nil } // 去重 neckRingList = RemoveDuplicates(neckRingList) // 计算合并 neckActiveHabitList := Recalculate(neckRingList) if len(neckActiveHabitList) <= 0 { return nil } for _, habit := range neckActiveHabitList { //更新脖环牛只相关信息 新数据直接插入 historyNeckActiveHabit, ct := e.IsExistNeckActiveHabit(pastureId, habit.NeckRingNumber, habit.HeatDate, habit.Frameid) if ct <= 0 { if err := e.DB.Create(habit).Error; err != nil { zaplog.Info("NeckRingOriginalMergeData-1", zap.Any("err", err), zap.Any("neckActiveHabit", habit), ) } } else { // 重新计算 newNeckActiveHabit := e.againRecalculate(historyNeckActiveHabit) if newNeckActiveHabit == nil { continue } if err := e.DB.Model(new(model.NeckActiveHabit)). Select("rumina", "intake", "inactive", "gasp", "other", "high", "active", "is_show", "record_count"). Where("id = ?", historyNeckActiveHabit.Id). Updates(newNeckActiveHabit).Error; err != nil { zaplog.Error("NeckRingOriginalMergeData-2", zap.Any("err", err), zap.Any("ct", ct), zap.Any("historyNeckActiveHabit", historyNeckActiveHabit), zap.Any("newNeckActiveHabit", newNeckActiveHabit), ) } } if err := e.UpdateNeckRingOriginalIsShow(habit); err != nil { zaplog.Error("NeckRingOriginalMergeData-4", zap.Any("err", err), zap.Any("neckActiveHabit", habit), ) } } return nil } func (e *Entry) UpdateNeckRingOriginalIsShow(habit *model.NeckActiveHabit) error { if err := e.DB.Model(new(model.NeckRingOriginal)). Where("pasture_id = ?", habit.PastureId). Where("neck_ring_number = ?", habit.NeckRingNumber). Where("active_date = ?", habit.HeatDate). Where("frameid IN (?)", util.FrameIds(habit.Frameid)). Update("is_show", pasturePb.IsShow_Ok).Error; err != nil { return xerr.WithStack(err) } return nil } // RemoveDuplicates 清洗一下数据,去掉重复的,如果有重复的,取最新的一条数据 func RemoveDuplicates(records []*model.NeckRingOriginal) []*model.NeckRingOriginal { uniqueRecords := make(map[string]*model.NeckRingOriginal) // 遍历原始数组 for _, record := range records { mapKey := fmt.Sprintf("%s%s%s%s%d", record.NeckRingNumber, model.JoinKey, record.ActiveDate, model.JoinKey, record.Frameid) // 0001/2023-12-04/0 0001/2023-12-03/4 if existing, exists := uniqueRecords[mapKey]; exists { if record.CreatedAt > existing.CreatedAt { uniqueRecords[mapKey] = record } } else { uniqueRecords[mapKey] = record } } // 将 map 中的记录转换为切片 result := make([]*model.NeckRingOriginal, 0, len(uniqueRecords)) for _, record := range uniqueRecords { result = append(result, record) } return result } // Recalculate 合并计算 func Recalculate(neckRingList []*model.NeckRingOriginal) []*model.NeckActiveHabit { originalMapData := make(map[string]*model.NeckRingOriginalMerge) // 合并成2个小时的 for _, v := range neckRingList { xframeId := util.XFrameId(v.Frameid) mapKey := fmt.Sprintf("%s%s%s%s%d", v.NeckRingNumber, model.JoinKey, v.ActiveDate, model.JoinKey, xframeId) // 0001/2023-12-04/0 0001/2023-12-03/4 if originalMapData[mapKey] == nil { originalMapData[mapKey] = new(model.NeckRingOriginalMerge) } originalMapData[mapKey].IsMageData(v, xframeId) } currTime := time.Now() res := make([]*model.NeckActiveHabit, 0) // 算平均值 for k, v := range originalMapData { // 过滤掉合并后<6条数据,如果时间太短就晚点再算 if v.RecordCount < model.DefaultRecordCount { currMaxXframeId := util.FrameIdMapReverse[int32(currTime.Hour())] activeDateString := fmt.Sprintf("%s %02d:00:00", v.ActiveDate, v.XframeId*2+1) activeDate, _ := util.TimeParseLocal(model.LayoutTime, activeDateString) if currMaxXframeId-v.XframeId <= 1 && currTime.Add(-1*time.Hour).Unix() < activeDate.Unix() { delete(originalMapData, k) continue } } v.SumAvg() } if len(originalMapData) <= 0 { return res } res = model.NeckRingOriginalMap(originalMapData).ForMatData() sort.Sort(model.NeckActiveHabitSlice(res)) return res } func (e *Entry) againRecalculate(data *model.NeckActiveHabit) *model.NeckActiveHabit { originalList := make([]*model.NeckRingOriginal, 0) frameIds := util.FrameIds(data.Frameid) sql := "" for _, frameId := range frameIds { sql += fmt.Sprintf(`SELECT * FROM neck_ring_original WHERE pasture_id = %d AND neck_ring_number = '%s' AND active_date = '%s' AND frameid = %d UNION ALL `, data.PastureId, data.NeckRingNumber, data.HeatDate, frameId) } if len(sql) > 0 { sql = strings.TrimSuffix(sql, "UNION ALL ") } if err := e.DB.Raw(sql).Find(&originalList).Error; err != nil { return nil } /*if err := e.DB.Model(new(model.NeckRingOriginal)). Where("pasture_id = ?", data.PastureId). Where("neck_ring_number = ?", data.NeckRingNumber). Where("active_date = ?", data.HeatDate). Where("frameid IN (?)", frameIds). Find(&originalList).Error; err != nil { return nil }*/ originalList = RemoveDuplicates(originalList) newDataList := Recalculate(originalList) if len(newDataList) != 1 { return nil } res := newDataList[0] res.IsShow = pasturePb.IsShow_No return res } // computeIfPositiveElse 辅助函数来计算过滤值 func computeIfPositiveElse(newValue, prevFilterValue float64, weightPrev, weightNew float64) float64 { return math.Ceil((prevFilterValue * weightPrev) + (weightNew * newValue)) } // 计算 score 的逻辑 func calculateScore(habit *model.NeckActiveHabit) int { // 第一部分逻辑 var part1 float64 switch { case (habit.CalvingAge <= 1 && habit.Lact >= 1) || (habit.CalvingAge >= 2 && habit.CalvingAge <= 13 && (habit.SumRumina+habit.SumIntake) == 0) || ((habit.Lact == 0 || habit.CalvingAge >= 14) && habit.ChangeFilter == -99): part1 = -199 case habit.CalvingAge >= 2 && habit.CalvingAge <= 13: part1 = math.Min((float64(habit.SumRumina+habit.SumIntake)-(100+math.Min(7, float64(habit.CalvingAge))*60))/10*2, 0) case habit.ChangeFilter > -99: part1 = math.Min(0, math.Min(getValueOrDefault(float64(habit.ChangeFilter), 0), getValueOrDefault(float64(habit.SumMinHigh), 0)))*0.2 + math.Min(0, math.Min(getValueOrDefault(float64(habit.ChangeFilter), 0), getValueOrDefault(float64(habit.SumMinChew), 0)))*0.2 + getRuminaSumIntakeSumScore(float64(habit.SumRumina+habit.SumIntake)) + getAdditionalScore(habit) default: part1 = -299 } // 第二部分逻辑 var part2 float64 switch { case habit.FirmwareVersion%100 >= 52: part2 = 1 case habit.FirmwareVersion%100 >= 30 && habit.FirmwareVersion%100 <= 43: part2 = 0.8 default: part2 = 0.6 } // 最终 score return DefaultScore + int(math.Floor(part1*part2)) } // 获取值或默认值 func getValueOrDefault(value, defaultValue float64) float64 { if value > -99 { return value } return defaultValue } // 计算累计反刍得分 func getRuminaSumIntakeSumScore(sum float64) float64 { switch { case sum < 80: return -30 case sum < 180: return -20 case sum < 280: return -10 default: return 0 } } // 计算额外得分 func getAdditionalScore(habit *model.NeckActiveHabit) float64 { var score float64 if (habit.SumRumina+habit.SumIntake < 280 || habit.SumMinHigh+habit.SumMinChew < -50) && habit.SumMaxHigh > 50 { score += 10 } if habit.ChangeFilter < -30 && habit.ChangeFilter <= habit.SumMinHigh && habit.ChewFilter < -30 && habit.ChewFilter <= habit.SumMinChew { score -= 5 } return score }