package crontab import ( "fmt" "kpt-pasture/model" "math" "time" pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "gitee.com/xuyiping_admin/pkg/xerr" "go.uber.org/zap" ) func (e *Entry) NeckRingCalculate() error { pastureList := e.FindPastureList() if pastureList == nil || len(pastureList) == 0 { return nil } if calculateIsRunning { return nil } defer func() { calculateIsRunning = false }() calculateIsRunning = true for _, pasture := range pastureList { if err := e.EntryUpdateActiveHabit(pasture.Id); err != nil { zaplog.Error("NeckRingCalculate", zap.Any("err", err), zap.Any("pasture", pasture)) } zaplog.Info(fmt.Sprintf("NeckRingCalculate Success %d", pasture.Id)) } return nil } func (e *Entry) EntryUpdateActiveHabit(pastureId int64) (err error) { // 获取这段执行数据内最大日期和最小日期 xToday, err := e.XToday(pastureId) if err != nil { return xerr.WithStack(err) } var processIds []int64 // 更新活动滤波 processIds, err = e.FirstFilterUpdate(pastureId, xToday) if err != nil { zaplog.Error("NeckRingCalculate", zap.Any("FirstFilterUpdate", err), zap.Any("xToday", xToday)) } zaplog.Info("NeckRingCalculate", zap.Any("xToday", xToday), zap.Any("processIds", processIds)) if len(processIds) <= 0 { return nil } e.WeeklyUpdateActiveHabit(pastureId, processIds, xToday) // 二次更新滤波 e.SecondUpdateChangeFilter(pastureId, xToday) // 活动量校正系数和健康评分 e.FilterCorrectAndScoreUpdate(pastureId, processIds, xToday) // 更新 ChangeFilter e.UpdateChangeFilter(pastureId, processIds) // 更新 FilterCorrect e.UpdateFilterCorrect(pastureId, processIds) // 插入群体校正表 e.UpdateChangeAdJust(pastureId, xToday) // 更新所有的显示状态为否的记录为是 e.UpdateIsShow(pastureId, processIds) // 健康预警 e.HealthWarning(pastureId, processIds) return nil } // FirstFilterUpdate 首次更新活动滤波 func (e *Entry) FirstFilterUpdate(pastureId int64, xToDay *XToday) (processIds []int64, err error) { newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0) if err = e.DB.Model(new(model.NeckActiveHabit)). Where("heat_date >= ?", "2025-03-01"). Where("pasture_id = ?", pastureId). Where("is_show = ?", pasturePb.IsShow_No). //Where("record_count = ?", model.DefaultRecordCount). Where(e.DB.Where("high >= ?", xToDay.High).Or("rumina >= ?", xToDay.Rumina)). Where("cow_id > ?", 0). Order("heat_date,neck_ring_number,frameid"). Limit(int(defaultLimit)). Find(&newNeckActiveHabitList).Error; err != nil { return nil, xerr.WithStack(err) } // 活动量滤波 for _, v := range newNeckActiveHabitList { // 过滤牛只未绑定的脖环的数据 cowInfo := e.GetCowInfoByNeckRingNumber(v.PastureId, v.NeckRingNumber) if cowInfo == nil || cowInfo.Id <= 0 { v.UpdateIsShowOk() if err = e.DB.Model(new(model.NeckActiveHabit)). Select("is_show"). Where("id = ?", v.Id). Updates(v).Error; err != nil { zaplog.Error("EntryUpdateActiveHabit", zap.Any("error", err)) } continue } // 8小时数据不全的不参与滤波 activeTime, _ := time.Parse(model.LayoutTime, v.ActiveTime) if v.RecordCount != model.DefaultRecordCount && activeTime.Sub(time.Now()).Hours() < 8 { continue } frameId := v.Frameid heatDate := v.HeatDate if v.Frameid == 0 { frameId = 11 heatDateParse, _ := time.Parse(model.LayoutDate2, heatDate) heatDate = heatDateParse.AddDate(0, 0, -1).Format(model.LayoutDate2) } else { frameId -= 1 } firstFilterData := e.FindFilterData(pastureId, v.NeckRingNumber, heatDate, frameId) if v.FilterHigh > 0 { firstFilterData.FilterHigh = v.FilterHigh } else { if v.NeckRingNumber == firstFilterData.NeckRingNumber { firstFilterData.FilterHigh = int32(computeIfPositiveElse(float64(v.High), float64(firstFilterData.FilterHigh), 0.23, 0.77)) } else { firstFilterData.FilterHigh = v.High } } if v.FilterRumina > 0 { firstFilterData.FilterRumina = v.FilterRumina } else { if v.NeckRingNumber == firstFilterData.NeckRingNumber { firstFilterData.FilterRumina = int32(computeIfPositiveElse(float64(v.Rumina), float64(firstFilterData.FilterRumina), 0.33, 0.67)) } else { firstFilterData.FilterRumina = v.Rumina } } if v.FilterChew > 0 { firstFilterData.FilterChew = v.FilterChew } else { if v.NeckRingNumber == firstFilterData.NeckRingNumber { firstFilterData.FilterChew = int32(computeIfPositiveElse(float64(v.Rumina+v.Intake), float64(firstFilterData.FilterChew), 0.33, 0.67)) } else { firstFilterData.FilterChew = v.Rumina + v.Intake } } processIds = append(processIds, v.Id) // 更新过滤值 // todo 记得更新胎次为牛只胎次,现在为了测试特意改成0 if err = e.DB.Model(new(model.NeckActiveHabit)). Select("filter_high", "filter_rumina", "filter_chew", "cow_id", "lact", "calving_age", "ear_number"). Where("id = ?", v.Id). Updates(map[string]interface{}{ "filter_high": firstFilterData.FilterHigh, "filter_rumina": firstFilterData.FilterRumina, "filter_chew": firstFilterData.FilterChew, "cow_id": cowInfo.Id, "lact": 0, "calving_age": cowInfo.CalvingAge, "ear_number": cowInfo.EarNumber, }).Error; err != nil { zaplog.Error("FirstFilterUpdate", zap.Any("error", err), zap.Any("firstFilterData", firstFilterData), zap.Any("NeckActiveHabit", v), zap.Any("cowInfo", cowInfo), zap.Any("xToday", xToDay), ) } } return processIds, nil } // SecondUpdateChangeFilter 第二次更新变化趋势滤波 func (e *Entry) SecondUpdateChangeFilter(pastureId int64, xToday *XToday) { newChangeFilterList := make([]*ChangeFilterData, 0) if err := e.DB.Model(new(model.NeckActiveHabit)). Select("id", "neck_ring_number", "change_high", "change_filter", "rumina_filter", "change_rumina", "chew_filter", "change_chew", "heat_date", "frameid", "IF(lact = 0, 0.8, 1) as xlc_dis_count"). Where("heat_date >= ?", time.Now().AddDate(0, 0, -2).Format(model.LayoutDate2)). Where("pasture_id = ?", pastureId). Where("change_filter = ?", model.InitChangeFilter). Where("change_high > ?", MinChangeHigh). Order("neck_ring_number,heat_date,frameid"). Limit(int(defaultLimit)). Find(&newChangeFilterList).Error; err != nil { zaplog.Error("SecondUpdateChangeFilter", zap.Any("error", err), zap.Any("xToday", xToday)) return } for _, v := range newChangeFilterList { frameId := v.FrameId heatDate := v.HeatDate if v.FrameId == 0 { frameId = 11 heatDateParse, _ := time.Parse(model.LayoutDate2, heatDate) heatDate = heatDateParse.AddDate(0, 0, -1).Format(model.LayoutDate2) } else { frameId -= 1 } secondFilterData := e.FindFilterData(pastureId, v.NeckRingNumber, heatDate, frameId) if v.ChangeFilter > MinChangeFilter { secondFilterData.ChangeFilter = v.ChangeFilter } else { if v.NeckRingNumber == secondFilterData.NeckRingNumber { changeFilter := float64(secondFilterData.ChangeFilter)*(1-(float64(xToday.XChangeDiscount)/10)*v.XlcDisCount) + math.Min(float64(v.ChangeHigh), float64(secondFilterData.ChangeFilter)+135)*(float64(xToday.XChangeDiscount)/10)*v.XlcDisCount secondFilterData.ChangeFilter = int32(changeFilter) } else { secondFilterData.ChangeFilter = 0 } } if v.RuminaFilter > MinRuminaFilter { secondFilterData.RuminaFilter = v.ChangeFilter } else { if v.NeckRingNumber == secondFilterData.NeckRingNumber { discount := float64(xToday.XRuminaDisc) / 10 * v.XlcDisCount if math.Abs(float64(v.ChangeRumina)) > 60 { discount *= 0.5 } secondFilterData.RuminaFilter = int32(float64(secondFilterData.RuminaFilter)*(1-discount) + float64(v.ChangeRumina)*discount) } else { secondFilterData.RuminaFilter = 0 } } secondFilterData.RuminaFilter = int32(math.Min(50, float64(secondFilterData.RuminaFilter))) if v.ChewFilter > MinChewFilter { secondFilterData.ChewFilter = v.ChangeChew } else { if v.NeckRingNumber == secondFilterData.NeckRingNumber { discount := float64(xToday.XRuminaDisc) / 10 if math.Abs(float64(v.ChangeChew)) > 60 { discount *= 0.5 } secondFilterData.ChewFilter = int32(float64(secondFilterData.ChewFilter)*(1-discount) + float64(v.ChangeChew)*discount) } else { secondFilterData.ChewFilter = 0 } } secondFilterData.ChewFilter = int32(math.Min(50, float64(secondFilterData.ChewFilter))) if err := e.DB.Model(new(model.NeckActiveHabit)). Select("change_filter", "rumina_filter", "chew_filter"). Where("id = ?", v.Id). Updates(map[string]interface{}{ "change_filter": secondFilterData.ChangeFilter, "rumina_filter": secondFilterData.RuminaFilter, "chew_filter": secondFilterData.ChewFilter, }).Error; err != nil { zaplog.Error("SecondUpdateChangeFilter", zap.Any("error", err), zap.Any("secondFilterData", secondFilterData)) } } } // FilterCorrectAndScoreUpdate 计算活动量变化趋势校正值(活跃度校正)和健康评分 func (e *Entry) FilterCorrectAndScoreUpdate(pastureId int64, processIds []int64, xToday *XToday) { beginDayDate := time.Now() before7DayDate := beginDayDate.AddDate(0, 0, -7).Format(model.LayoutDate2) before1DayDate := beginDayDate.AddDate(0, 0, -1).Format(model.LayoutDate2) neckActiveHabitList := make([]*model.NeckActiveHabit, 0) if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Where("pasture_id = ?", pastureId). Find(&neckActiveHabitList).Error; err != nil { zaplog.Error("ActivityVolumeChanges-1", zap.Any("error", err), zap.Any("xToday", xToday)) return } for _, v := range neckActiveHabitList { activityVolume := &ActivityVolume{} if err := e.DB.Model(new(model.NeckActiveHabit)). Select("neck_ring_number", "AVG(IF(change_filter>=60, 60, change_filter)) as avg_filter", "ROUND(STD(IF(change_filter>=60, 60, change_filter))) as std_filter", "COUNT(1) as nb"). Where("heat_date BETWEEN ? AND ?", before7DayDate, before1DayDate). Where("pasture_id = ?", pastureId). Where(e.DB.Where("high > ?", xToday.High).Or("rumina >= ?", xToday.Rumina)). Where("active_time <= ?", beginDayDate.Add(-12*time.Hour).Format(model.LayoutTime)). Where("change_filter > ?", MinChangeFilter). Where("neck_ring_number = ?"). Having("nb > ?", DefaultNb). First(&activityVolume).Error; err != nil { zaplog.Error("ActivityVolumeChanges-0", zap.Any("error", err), zap.Any("xToday", xToday)) continue } if activityVolume != nil && activityVolume.NeckRingNumber != "" { filterCorrect := model.DefaultFilterCorrect - int(math.Floor(activityVolume.AvgFilter/3+float64(activityVolume.StdFilter)/2)) // 活动量校正系数 if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id = ?", v.Id). Where("neck_ring_number = ?", v.NeckRingNumber). Update("filter_correct", filterCorrect).Error; err != nil { zaplog.Error("ActivityVolumeChanges-2", zap.Any("error", err), zap.Any("xToday", xToday)) continue } } cowScore := calculateScore(v) if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id = ?", v.Id). Update("score", cowScore).Error; err != nil { zaplog.Error("ActivityVolumeChanges-2", zap.Any("error", err), zap.Any("xToday", xToday)) continue } } } func (e *Entry) UpdateChangeFilter(pastureId int64, processIds []int64) { if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Where("pasture_id = ?", pastureId). Where("is_show = ?", pasturePb.IsShow_No). Where("change_filter = ?", model.InitChangeFilter). Updates(map[string]interface{}{ "change_filter": model.DefaultChangeFilter, "rumina_filter": model.DefaultRuminaFilter, "chew_filter": model.DefaultChewFilter, }).Error; err != nil { zaplog.Error("UpdateChangeFilter", zap.Any("change_filter", err)) } } func (e *Entry) UpdateFilterCorrect(pastureId int64, processIds []int64) { if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Where("pasture_id = ?", pastureId). Where("change_filter < ?", 0). Where("filter_correct < ?", model.DefaultFilterCorrect). Updates(map[string]interface{}{ "filter_correct": model.DefaultFilterCorrect, }).Error; err != nil { zaplog.Error("UpdateFilterCorrect", zap.Any("filter_correct", err)) } } // UpdateChangeAdJust 更新群体校正数据 func (e *Entry) UpdateChangeAdJust(pastureId int64, xToday *XToday) { /*-- 插入群体校正表 INSERT INTO data_bar_change(heatdate, frameid, intCurBar, intCurBarName, nb, highchange, changefilter) SELECT h.heatdate, h.frameid, c.intCurBar, c.intCurBarName, COUNT(*) nb, ROUND(AVG(h.highchange)) highchange, ROUND(AVG(h.changefilter) ) changefilter FROM h_activehabit h JOIN cow c ON h.intCowId=c.intCowId WHERE h.heatdate>=(CURDATE() -INTERVAL 1 DAY ) GROUP BY h.heatdate, h.frameid, c.intCurBar ORDER BY h.heatdate, h.frameid, c.intCurBarName ON DUPLICATE KEY UPDATE nb = VALUES(nb), highchange = VALUES(highchange), changefilter = VALUES(changefilter); UPDATE h_activehabit h JOIN cow c ON h.intCowId=c.intCowId JOIN data_bar_change cg ON h.heatdate=cg.heatdate AND h.frameid=cg.frameid AND c.intCurBar=cg.intCurBar SET h.changeadjust=cg.changefilter WHERE h.id>xBeg_update_act_Id AND h.heatdate>=CURRENT_DATE() - INTERVAL 1 DAY AND ABS(cg.changefilter)>=10; */ res := make([]*model.NeckRingBarChange, 0) oneDayAgo := time.Now().AddDate(0, 0, -1).Format(model.LayoutDate2) if err := e.DB.Table(fmt.Sprintf("%s as h", new(model.NeckActiveHabit).TableName())). Select("h.neck_ring_number,h.heat_date, h.frameid, c.pen_id, c.pen_name, COUNT(*) as nb, ROUND(AVG(h.change_high)) as change_high, ROUND(AVG(h.change_filter)) as change_filter"). Joins("JOIN cow as c ON h.neck_ring_number = c.neck_ring_number"). Where("h.pasture_id = ?", pastureId). Where("h.heat_date >= ?", oneDayAgo). Where("h.cow_id >= ?", 0). Group("h.heat_date, h.frameid, c.pen_id"). Order("h.heat_date, h.frameid, c.pen_name"). Find(&res).Error; err != nil { zaplog.Error("UpdateChangeAdJust", zap.Any("error", err), zap.Any("xToday", xToday)) } // todo ABS(cg.changefilter)>=10; for _, v := range res { if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id > ?", xToday.LastMaxHabitId). Where("heat_date = ?", v.HeatDate). Where("frameid = ?", v.FrameId). Where("neck_ring_number = ?", v.NeckRingNumber). Update("change_adjust", v.ChangeHigh).Error; err != nil { zaplog.Error("UpdateChangeAdJust-1", zap.Any("error", err), zap.Any("xToday", xToday)) } } } func (e *Entry) UpdateIsShow(pastureId int64, processIds []int64) { if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Where("pasture_id = ?", pastureId). Update("is_show", pasturePb.IsShow_Ok).Error; err != nil { zaplog.Error("UpdateChangeAdJust-2", zap.Any("error", err)) } } func (e *Entry) XToday(pastureId int64) (*XToday, error) { xToday := &XToday{} systemConfigureList, err := e.GetSystemNeckRingConfigure(pastureId) if err != nil { return nil, xerr.WithStack(err) } for _, v := range systemConfigureList { switch v.Name { case model.MaxHabit: xToday.LastMaxHabitId = v.Value case model.High: xToday.High = int32(v.Value) case model.Rumina: xToday.Rumina = int32(v.Value) case model.XRuminaDisc: xToday.XRuminaDisc = int32(v.Value) case model.XChangeDiscount: xToday.XChangeDiscount = int32(v.Value) case model.WeeklyActive: xToday.WeeklyActive = int32(v.Value) } } return xToday, nil } // WeeklyUpdateActiveHabit 时间点周平均值计算 func (e *Entry) WeeklyUpdateActiveHabit(pastureId int64, processIds []int64, xToDay *XToday) { newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0) if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Order("heat_date,neck_ring_number,frameid"). Find(&newNeckActiveHabitList).Error; err != nil { zaplog.Error("WeeklyUpdateActiveHabit", zap.Any("error", err), zap.Any("processIds", processIds)) } if len(newNeckActiveHabitList) <= 0 { return } e.HabitUpdateActiveHabit(pastureId, newNeckActiveHabitList, xToDay) e.SumUpdateActiveHabit(pastureId, newNeckActiveHabitList, xToDay) e.ActiveChange(pastureId, processIds, xToDay) e.Before3DaysNeckActiveHabit(pastureId, processIds) } func (e *Entry) HabitUpdateActiveHabit(pastureId int64, newNeckActiveHabitList []*model.NeckActiveHabit, xToDay *XToday) { for _, v := range newNeckActiveHabitList { // 前七天的 weekHabitData := e.FindWeekHabitData(pastureId, v.NeckRingNumber, v.HeatDate, v.Frameid, xToDay) // 更新过滤值 if err := e.DB.Model(new(model.NeckActiveHabit)). Select("high_habit", "rumina_habit", "chew_habit", "intake_habit", "inactive_habit"). Where("id = ?", v.Id). Updates(map[string]interface{}{ "high_habit": weekHabitData.HighHabit, "rumina_habit": weekHabitData.RuminaHabit, "chew_habit": weekHabitData.ChewHabit, "intake_habit": weekHabitData.IntakeHabit, "inactive_habit": weekHabitData.InactiveHabit, }).Error; err != nil { zaplog.Error("WeeklyUpdateActiveHabit", zap.Error(err), zap.Any("NeckActiveHabit", v), zap.Any("pastureId", pastureId), ) } } } // SumUpdateActiveHabit -- 累计24小时数值 func (e *Entry) SumUpdateActiveHabit(pastureId int64, newNeckActiveHabitList []*model.NeckActiveHabit, xToDay *XToday) { for _, v := range newNeckActiveHabitList { sumHabitData := e.FindSumHabitData(pastureId, v.NeckRingNumber, v.HeatDate, v.Frameid, xToDay) // 更新过滤值 if err := e.DB.Model(new(model.NeckActiveHabit)). Select("sum_rumina", "sum_intake", "sum_inactive", "sum_active", "sum_max_high", "sum_min_high", "sum_min_chew"). Where("id = ?", v.Id). Updates(map[string]interface{}{ "sum_rumina": sumHabitData.SumRumina, "sum_intake": sumHabitData.SumIntake, "sum_inactive": sumHabitData.SumInactive, "sum_active": sumHabitData.SumActive, "sum_max_high": sumHabitData.SumMaxHigh, "sum_min_high": sumHabitData.SumMinHigh, "sum_min_chew": sumHabitData.SumMinChew, }).Error; err != nil { zaplog.Error("WeeklyUpdateActiveHabit", zap.Any("err", err), zap.Any("NeckActiveHabit", v), zap.Any("pastureId", pastureId), ) } } } // ActiveChange -- 变化百分比 func (e *Entry) ActiveChange(pastureId int64, processIds []int64, xToDay *XToday) { newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0) if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Where("high_habit > ?", 0). Where(e.DB.Where("high >= ?", xToDay.High).Or("rumina >= ?", xToDay.Rumina)). Find(&newNeckActiveHabitList).Error; err != nil { zaplog.Error("ActiveChange", zap.Any("error", err), zap.Any("processIds", processIds)) } for _, v := range newNeckActiveHabitList { highDiff := v.FilterHigh - v.HighHabit denominator := float64(v.WeekHigh)*0.6 + float64(v.HighHabit)*0.2 + float64(xToDay.WeeklyActive)*0.2 if highDiff > 0 { v.ChangeHigh = int32(math.Round((float64(highDiff) / denominator / float64(v.HighHabit)) * 100)) } else { v.ChangeHigh = int32(math.Round(float64(highDiff) / float64(v.HighHabit) * 100)) } if v.RuminaHabit != 0 { v.ChangeRumina = int32(math.Round(float64(v.FilterRumina-v.RuminaHabit) / float64(v.RuminaHabit) * 100)) } if v.ChewHabit != 0 { v.ChangeChew = int32(math.Round(float64(v.FilterChew-v.ChewHabit) / float64(v.ChewHabit) * 100)) } // 更新过滤值 if err := e.DB.Model(new(model.NeckActiveHabit)). Select("change_high", "change_rumina", "change_chew"). Where("id = ?", v.Id). Updates(map[string]interface{}{ "change_high": v.ChangeHigh, "change_rumina": v.ChangeRumina, "change_chew": v.ChangeChew, }).Error; err != nil { zaplog.Error("WeeklyUpdateActiveHabit", zap.Error(err), zap.Any("NeckActiveHabit", v), zap.Any("pastureId", pastureId), ) } } } func (e *Entry) Before3DaysNeckActiveHabit(pastureId int64, processIds []int64) { newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0) if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Order("heat_date,neck_ring_number,frameid"). Find(&newNeckActiveHabitList).Error; err != nil { zaplog.Error("Before3DaysNeckActiveHabit", zap.Any("error", err), zap.Any("processIds", processIds)) } for _, v := range newNeckActiveHabitList { before3DaysNeckActiveHabit := e.FindBefore3DaysNeckActiveHabit(pastureId, v.NeckRingNumber, v.HeatDate, v.Frameid) if before3DaysNeckActiveHabit.SumRumina == 0 && before3DaysNeckActiveHabit.SumIntake == 0 { continue } // 更新过滤值 if err := e.DB.Model(new(model.NeckActiveHabit)). Select("before_three_sum_rumina", "before_three_sum_intake"). Where("id = ?", v.Id). Updates(map[string]interface{}{ "before_three_sum_rumina": before3DaysNeckActiveHabit.SumRumina, "before_three_sum_intake": before3DaysNeckActiveHabit.SumIntake, }).Error; err != nil { zaplog.Error("Before3DaysNeckActiveHabit", zap.Error(err), zap.Any("NeckActiveHabit", v), zap.Any("pastureId", pastureId), ) } } }