package crontab import ( "fmt" "kpt-pasture/model" "kpt-pasture/util" "math" "time" pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "gitee.com/xuyiping_admin/pkg/xerr" "go.uber.org/zap" ) func (e *Entry) NeckRingCalculate() error { pastureList := e.FindPastureList() if pastureList == nil || len(pastureList) == 0 { return nil } if calculateIsRunning { return nil } defer func() { calculateIsRunning = false }() calculateIsRunning = true for _, pasture := range pastureList { if err := e.EntryUpdateActiveHabit(pasture.Id); err != nil { zaplog.Error("NeckRingCalculate", zap.Any("err", err), zap.Any("pasture", pasture)) } zaplog.Info(fmt.Sprintf("NeckRingCalculate Success %d", pasture.Id)) } return nil } func (e *Entry) EntryUpdateActiveHabit(pastureId int64) (err error) { // 获取这段执行数据内最大日期和最小日期 xToday, err := e.XToday(pastureId) if err != nil { return xerr.WithStack(err) } // 未配置的滤波数据不参与计算 if xToday == nil { return nil } var processIds []int64 // 更新活动滤波 processIds, err = e.FirstFilterUpdate(pastureId, xToday) if err != nil { zaplog.Error("NeckRingCalculate", zap.Any("pastureId", pastureId), zap.Any("FirstFilterUpdate", err), zap.Any("xToday", xToday)) } zaplog.Info("NeckRingCalculate", zap.Any("xToday", xToday), zap.Any("processIds", processIds)) if len(processIds) <= 0 { return nil } e.WeeklyUpdateActiveHabit(pastureId, processIds, xToday) // 二次更新滤波 e.SecondUpdateChangeFilter(pastureId, processIds, xToday) // 活动量校正系数和健康评分 e.FilterCorrectAndScoreUpdate(pastureId, processIds, xToday) // 更新 ChangeFilter e.UpdateChangeFilter(pastureId, processIds) // 更新 FilterCorrect e.UpdateFilterCorrect(pastureId, processIds) // 插入群体校正表 e.UpdateChangeAdJust(pastureId, xToday) // 更新所有的显示状态为否的记录为是 e.UpdateIsShow(pastureId, processIds) // 健康预警 e.HealthWarning(pastureId, processIds) return nil } // FirstFilterUpdate 首次更新活动滤波 func (e *Entry) FirstFilterUpdate(pastureId int64, xToDay *XToday) (processIds []int64, err error) { limit := e.Cfg.NeckRingLimit if limit <= 0 { limit = defaultLimit } newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0) if err = e.DB.Model(new(model.NeckActiveHabit)). Where("heat_date >= ?", time.Now().AddDate(0, 0, -1).Format(model.LayoutDate2)). Where("pasture_id = ?", pastureId). Where("is_show = ?", pasturePb.IsShow_No). Where(e.DB.Where("high >= ?", xToDay.High).Or("rumina >= ?", xToDay.Rumina)). Order("heat_date,neck_ring_number,frameid"). Limit(int(limit)). Find(&newNeckActiveHabitList).Error; err != nil { return nil, xerr.WithStack(err) } // 活动量滤波 for _, v := range newNeckActiveHabitList { // 过滤牛只未绑定的脖环的数据 cowInfo := e.GetCowInfoByNeckRingNumber(v.PastureId, v.NeckRingNumber) if cowInfo == nil || cowInfo.Id <= 0 { v.UpdateIsShowOk() if err = e.DB.Model(new(model.NeckActiveHabit)). Select("is_show"). Where("id = ?", v.Id). Updates(v).Error; err != nil { zaplog.Error("EntryUpdateActiveHabit", zap.Any("error", err)) } continue } // 8小时数据不全的不参与滤波 activeTime, _ := util.TimeParseLocal(model.LayoutTime, v.ActiveTime) if v.RecordCount != model.DefaultRecordCount && time.Now().Sub(activeTime).Hours() < 8 { continue } frameId := v.Frameid heatDate := v.HeatDate if v.Frameid == 0 { frameId = 11 heatDateParse, _ := util.TimeParseLocal(model.LayoutDate2, heatDate) heatDate = heatDateParse.AddDate(0, 0, -1).Format(model.LayoutDate2) } else { frameId -= 1 } firstFilterData := e.FindFilterData(pastureId, v.NeckRingNumber, heatDate, frameId) if v.FilterHigh > 0 { firstFilterData.FilterHigh = v.FilterHigh } else { if v.NeckRingNumber == firstFilterData.NeckRingNumber { firstFilterData.FilterHigh = int32(computeIfPositiveElse(float64(v.High), float64(firstFilterData.FilterHigh), 0.23, 0.77)) } else { firstFilterData.FilterHigh = v.High } } if v.FilterRumina > 0 { firstFilterData.FilterRumina = v.FilterRumina } else { if v.NeckRingNumber == firstFilterData.NeckRingNumber { firstFilterData.FilterRumina = int32(computeIfPositiveElse(float64(v.Rumina), float64(firstFilterData.FilterRumina), 0.33, 0.67)) } else { firstFilterData.FilterRumina = v.Rumina } } if v.FilterChew > 0 { firstFilterData.FilterChew = v.FilterChew } else { if v.NeckRingNumber == firstFilterData.NeckRingNumber { firstFilterData.FilterChew = int32(computeIfPositiveElse(float64(v.Rumina+v.Intake), float64(firstFilterData.FilterChew), 0.33, 0.67)) } else { firstFilterData.FilterChew = v.Rumina + v.Intake } } processIds = append(processIds, v.Id) // 更新过滤值 // todo 记得更新胎次为牛只胎次,现在为了测试特意改成0 if err = e.DB.Model(new(model.NeckActiveHabit)). Select("filter_high", "filter_rumina", "filter_chew", "cow_id", "lact", "calving_age", "ear_number"). Where("id = ?", v.Id). Updates(map[string]interface{}{ "filter_high": firstFilterData.FilterHigh, "filter_rumina": firstFilterData.FilterRumina, "filter_chew": firstFilterData.FilterChew, "cow_id": cowInfo.Id, "lact": 0, "calving_age": cowInfo.CalvingAge, "ear_number": cowInfo.EarNumber, }).Error; err != nil { zaplog.Error("FirstFilterUpdate", zap.Any("error", err), zap.Any("firstFilterData", firstFilterData), zap.Any("NeckActiveHabit", v), zap.Any("cowInfo", cowInfo), zap.Any("xToday", xToDay), ) } } return processIds, nil } // SecondUpdateChangeFilter 第二次更新变化趋势滤波 func (e *Entry) SecondUpdateChangeFilter(pastureId int64, processIds []int64, xToday *XToday) { newChangeFilterList := make([]*ChangeFilterData, 0) if err := e.DB.Model(new(model.NeckActiveHabit)). Select("id", "neck_ring_number", "change_high", "change_filter", "rumina_filter", "change_rumina", "chew_filter", "change_chew", "heat_date", "frameid", "IF(lact = 0, 0.8, 1) as xlc_dis_count"). Where("pasture_id = ?", pastureId). Where("id IN (?)", processIds). Where("change_filter = ?", model.InitChangeFilter). Where("change_high > ?", MinChangeHigh). Order("neck_ring_number,heat_date,frameid"). Find(&newChangeFilterList).Error; err != nil { zaplog.Error("SecondUpdateChangeFilter", zap.Any("error", err), zap.Any("xToday", xToday)) return } for _, v := range newChangeFilterList { frameId := v.FrameId heatDate := v.HeatDate if v.FrameId == 0 { frameId = 11 heatDateParse, _ := util.TimeParseLocal(model.LayoutDate2, heatDate) heatDate = heatDateParse.AddDate(0, 0, -1).Format(model.LayoutDate2) } else { frameId -= 1 } xChangeDiscount := float64(xToday.XChangeDiscount) / 10 xRuminaDisc := float64(xToday.XRuminaDisc) / 10 secondFilterData := e.FindFilterData(pastureId, v.NeckRingNumber, heatDate, frameId) if secondFilterData.ChangeFilter <= MinChangeFilter { secondFilterData.ChangeFilter = 0 } if secondFilterData.RuminaFilter <= MinRuminaFilter { secondFilterData.RuminaFilter = 0 } if secondFilterData.ChewFilter <= MinChewFilter { secondFilterData.ChewFilter = 0 } changeFilter := float64(v.ChangeFilter) if v.ChangeFilter <= MinChangeFilter { changeFilter = float64(secondFilterData.ChangeFilter)*(1-xChangeDiscount*v.XlcDisCount) + math.Min(float64(v.ChangeHigh), float64(secondFilterData.ChangeFilter)+135)*xChangeDiscount*v.XlcDisCount } ruminaFilter := float64(v.RuminaFilter) discount := xRuminaDisc * v.XlcDisCount if math.Abs(float64(v.ChangeRumina)) > 60 { discount *= 0.5 } ruminaFilter = float64(secondFilterData.RuminaFilter)*(1-discount) + float64(v.ChangeRumina)*discount chewFilter := float64(v.ChewFilter) chewFilterDiscount := float64(1) if math.Abs(float64(v.ChangeChew)) > 60 { chewFilterDiscount = 0.5 } chewFilter = float64(secondFilterData.ChewFilter)*(1-xRuminaDisc*chewFilterDiscount) + float64(v.ChangeChew)*xRuminaDisc*chewFilterDiscount if err := e.DB.Model(new(model.NeckActiveHabit)). Select("change_filter", "rumina_filter", "chew_filter"). Where("id = ?", v.Id). Updates(map[string]interface{}{ "change_filter": int32(changeFilter), "rumina_filter": int32(math.Min(50, ruminaFilter)), "chew_filter": int32(math.Min(50, chewFilter)), }).Error; err != nil { zaplog.Error("SecondUpdateChangeFilter", zap.Any("error", err), zap.Any("secondFilterData", secondFilterData)) } } } // FilterCorrectAndScoreUpdate 计算活动量变化趋势校正值(活跃度校正)和健康评分 func (e *Entry) FilterCorrectAndScoreUpdate(pastureId int64, processIds []int64, xToday *XToday) { beginDayDate := time.Now() before7DayDate := beginDayDate.AddDate(0, 0, -7).Format(model.LayoutDate2) before1DayDate := beginDayDate.AddDate(0, 0, -1).Format(model.LayoutDate2) neckActiveHabitList := make([]*model.NeckActiveHabit, 0) if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Where("pasture_id = ?", pastureId). Find(&neckActiveHabitList).Error; err != nil { zaplog.Error("ActivityVolumeChanges-1", zap.Any("error", err), zap.Any("xToday", xToday)) return } for _, v := range neckActiveHabitList { activityVolume := &ActivityVolume{} if err := e.DB.Model(new(model.NeckActiveHabit)). Select("neck_ring_number", "AVG(IF(change_filter>=60, 60, change_filter)) as avg_filter", "ROUND(STD(IF(change_filter>=60, 60, change_filter))) as std_filter", "COUNT(1) as nb"). Where("heat_date BETWEEN ? AND ?", before7DayDate, before1DayDate). Where("pasture_id = ?", pastureId). Where(e.DB.Where("high > ?", xToday.High).Or("rumina >= ?", xToday.Rumina)). Where("active_time <= ?", beginDayDate.Add(-12*time.Hour).Format(model.LayoutTime)). Where("change_filter > ?", MinChangeFilter). Where("neck_ring_number = ?", v.NeckRingNumber). Having("nb > ?", DefaultNb). First(&activityVolume).Error; err != nil { zaplog.Error("ActivityVolumeChanges-0", zap.Any("error", err), zap.Any("xToday", xToday)) continue } if activityVolume != nil && activityVolume.NeckRingNumber != "" { filterCorrect := model.DefaultFilterCorrect - int(math.Floor(activityVolume.AvgFilter/3+float64(activityVolume.StdFilter)/2)) // 活动量校正系数 if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id = ?", v.Id). Where("neck_ring_number = ?", v.NeckRingNumber). Update("filter_correct", filterCorrect).Error; err != nil { zaplog.Error("ActivityVolumeChanges-2", zap.Any("error", err), zap.Any("xToday", xToday)) continue } } cowScore := calculateScore(v) if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id = ?", v.Id). Update("score", cowScore).Error; err != nil { zaplog.Error("ActivityVolumeChanges-2", zap.Any("error", err), zap.Any("xToday", xToday)) continue } } } func (e *Entry) UpdateChangeFilter(pastureId int64, processIds []int64) { if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Where("pasture_id = ?", pastureId). Where("is_show = ?", pasturePb.IsShow_No). Where("change_filter = ?", model.InitChangeFilter). Updates(map[string]interface{}{ "change_filter": model.DefaultChangeFilter, "rumina_filter": model.DefaultRuminaFilter, "chew_filter": model.DefaultChewFilter, }).Error; err != nil { zaplog.Error("UpdateChangeFilter", zap.Any("change_filter", err)) } } func (e *Entry) UpdateFilterCorrect(pastureId int64, processIds []int64) { if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Where("pasture_id = ?", pastureId). Where("change_filter < ?", 0). Where("filter_correct < ?", model.DefaultFilterCorrect). Updates(map[string]interface{}{ "filter_correct": model.DefaultFilterCorrect, }).Error; err != nil { zaplog.Error("UpdateFilterCorrect", zap.Any("filter_correct", err)) } } // UpdateChangeAdJust 更新群体校正数据 func (e *Entry) UpdateChangeAdJust(pastureId int64, xToday *XToday) { res := make([]*model.NeckRingBarChange, 0) oneDayAgo := time.Now().AddDate(0, 0, -1).Format(model.LayoutDate2) if err := e.DB.Table(fmt.Sprintf("%s as h", new(model.NeckActiveHabit).TableName())). Select(`h.neck_ring_number,h.heat_date, h.frameid, c.pen_id, c.pen_name, COUNT(*) as nb, ROUND(AVG(h.change_high)) as change_high, ROUND(AVG(h.change_filter)) as change_filter`). Joins("JOIN cow as c ON h.neck_ring_number = c.neck_ring_number"). Where("h.pasture_id = ?", pastureId). Where("h.heat_date >= ?", oneDayAgo). Where("h.is_show = ?", pasturePb.IsShow_Ok). Where("h.cow_id >= ?", 0). Where("c.pen_id > ?", 0). Group("h.heat_date, h.frameid, c.pen_id"). Order("h.heat_date, h.frameid, c.pen_id"). Find(&res).Error; err != nil { zaplog.Error("UpdateChangeAdJust", zap.Any("error", err), zap.Any("xToday", xToday)) } for _, v := range res { if math.Abs(float64(v.ChangeFilter)) < 10 { continue } if err := e.DB.Model(new(model.NeckActiveHabit)). Where("pasture_id = ?", pastureId). Where("neck_ring_number = ?", v.NeckRingNumber). Where("heat_date = ?", v.HeatDate). Where("frameid = ?", v.FrameId). Update("change_adjust", v.ChangeFilter).Error; err != nil { zaplog.Error("UpdateChangeAdJust-1", zap.Any("error", err), zap.Any("xToday", xToday)) } } } func (e *Entry) UpdateIsShow(pastureId int64, processIds []int64) { if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Where("pasture_id = ?", pastureId). Update("is_show", pasturePb.IsShow_Ok).Error; err != nil { zaplog.Error("UpdateChangeAdJust-2", zap.Any("error", err)) } } func (e *Entry) XToday(pastureId int64) (*XToday, error) { xToday := &XToday{} systemConfigureList, err := e.GetSystemNeckRingConfigure(pastureId) if err != nil { return nil, xerr.WithStack(err) } if len(systemConfigureList) <= 0 { return nil, nil } for _, v := range systemConfigureList { switch v.Name { case model.MaxHabit: xToday.LastMaxHabitId = v.Value case model.High: xToday.High = int32(v.Value) case model.Rumina: xToday.Rumina = int32(v.Value) case model.XRuminaDisc: xToday.XRuminaDisc = int32(v.Value) case model.XChangeDiscount: xToday.XChangeDiscount = int32(v.Value) case model.WeeklyActive: xToday.WeeklyActive = int32(v.Value) } } return xToday, nil } // WeeklyUpdateActiveHabit 时间点周平均值计算 func (e *Entry) WeeklyUpdateActiveHabit(pastureId int64, processIds []int64, xToDay *XToday) { newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0) if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Order("heat_date,neck_ring_number,frameid"). Find(&newNeckActiveHabitList).Error; err != nil { zaplog.Error("WeeklyUpdateActiveHabit", zap.Any("error", err), zap.Any("processIds", processIds)) } if len(newNeckActiveHabitList) <= 0 { return } e.HabitUpdateActiveHabit(pastureId, newNeckActiveHabitList, xToDay) e.SumUpdateActiveHabit(pastureId, newNeckActiveHabitList, xToDay) e.ActiveChange(pastureId, processIds, xToDay) e.Before3DaysNeckActiveHabit(pastureId, processIds) } func (e *Entry) HabitUpdateActiveHabit(pastureId int64, newNeckActiveHabitList []*model.NeckActiveHabit, xToDay *XToday) { for _, v := range newNeckActiveHabitList { // 前七天的 weekHabitData := e.FindWeekHabitData(pastureId, v.NeckRingNumber, v.HeatDate, v.Frameid, xToDay) // 更新过滤值 if err := e.DB.Model(new(model.NeckActiveHabit)). Select("high_habit", "rumina_habit", "chew_habit", "intake_habit", "inactive_habit"). Where("id = ?", v.Id). Updates(map[string]interface{}{ "high_habit": weekHabitData.HighHabit, "rumina_habit": weekHabitData.RuminaHabit, "chew_habit": weekHabitData.ChewHabit, "intake_habit": weekHabitData.IntakeHabit, "inactive_habit": weekHabitData.InactiveHabit, }).Error; err != nil { zaplog.Error("WeeklyUpdateActiveHabit", zap.Error(err), zap.Any("NeckActiveHabit", v), zap.Any("pastureId", pastureId), ) } } } // SumUpdateActiveHabit -- 累计24小时数值 func (e *Entry) SumUpdateActiveHabit(pastureId int64, newNeckActiveHabitList []*model.NeckActiveHabit, xToDay *XToday) { for _, v := range newNeckActiveHabitList { sumHabitData := e.FindSumHabitData(pastureId, v.NeckRingNumber, v.HeatDate, v.Frameid, xToDay) // 更新过滤值 if err := e.DB.Model(new(model.NeckActiveHabit)). Select("sum_rumina", "sum_intake", "sum_inactive", "sum_active", "sum_max_high", "sum_min_high", "sum_min_chew"). Where("id = ?", v.Id). Updates(map[string]interface{}{ "sum_rumina": sumHabitData.SumRumina, "sum_intake": sumHabitData.SumIntake, "sum_inactive": sumHabitData.SumInactive, "sum_active": sumHabitData.SumActive, "sum_max_high": sumHabitData.SumMaxHigh, "sum_min_high": sumHabitData.SumMinHigh, "sum_min_chew": sumHabitData.SumMinChew, }).Error; err != nil { zaplog.Error("WeeklyUpdateActiveHabit", zap.Any("err", err), zap.Any("NeckActiveHabit", v), zap.Any("pastureId", pastureId), ) } } } // ActiveChange -- 变化百分比 func (e *Entry) ActiveChange(pastureId int64, processIds []int64, xToDay *XToday) { newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0) if err := e.DB.Model(new(model.NeckActiveHabit)). Where("pasture_id = ?", pastureId). Where("id IN (?)", processIds). Where("high_habit > ?", 0). Where(e.DB.Where("high >= ?", xToDay.High).Or("rumina >= ?", xToDay.Rumina)). Find(&newNeckActiveHabitList).Error; err != nil { zaplog.Error("ActiveChange", zap.Any("error", err), zap.Any("processIds", processIds)) } for _, v := range newNeckActiveHabitList { changeHigh := calculateChangeHigh(v, xToDay.WeeklyActive) changeRumina := int32(0) changeChew := int32(0) if v.RuminaHabit != 0 { changeRumina = int32(math.Round(float64(v.FilterRumina-v.RuminaHabit) / float64(v.RuminaHabit) * 100)) } if v.ChewHabit != 0 { changeChew = int32(math.Round(float64(v.FilterChew-v.ChewHabit) / float64(v.ChewHabit) * 100)) } // 更新过滤值 if err := e.DB.Model(new(model.NeckActiveHabit)). Select("change_high", "change_rumina", "change_chew"). Where("id = ?", v.Id). Updates(map[string]interface{}{ "change_high": changeHigh, "change_rumina": changeRumina, "change_chew": changeChew, }).Error; err != nil { zaplog.Error("ActiveChange", zap.Any("err", err), zap.Any("NeckActiveHabit", v), ) } } } func (e *Entry) Before3DaysNeckActiveHabit(pastureId int64, processIds []int64) { newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0) if err := e.DB.Model(new(model.NeckActiveHabit)). Where("id IN (?)", processIds). Order("heat_date,neck_ring_number,frameid"). Find(&newNeckActiveHabitList).Error; err != nil { zaplog.Error("Before3DaysNeckActiveHabit", zap.Any("error", err), zap.Any("processIds", processIds)) } for _, v := range newNeckActiveHabitList { before3DaysNeckActiveHabit := e.FindBefore3DaysNeckActiveHabit(pastureId, v.NeckRingNumber, v.HeatDate, v.Frameid) if before3DaysNeckActiveHabit.SumRumina == 0 && before3DaysNeckActiveHabit.SumIntake == 0 { continue } // 更新过滤值 if err := e.DB.Model(new(model.NeckActiveHabit)). Select("before_three_sum_rumina", "before_three_sum_intake"). Where("id = ?", v.Id). Updates(map[string]interface{}{ "before_three_sum_rumina": before3DaysNeckActiveHabit.SumRumina, "before_three_sum_intake": before3DaysNeckActiveHabit.SumIntake, }).Error; err != nil { zaplog.Error("Before3DaysNeckActiveHabit", zap.Error(err), zap.Any("NeckActiveHabit", v), zap.Any("pastureId", pastureId), ) } } } // calculateChangeHigh 计算活动量变化 func calculateChangeHigh(data *model.NeckActiveHabit, weeklyActive int32) int32 { highDiff := data.FilterHigh - data.HighHabit changeHigh := int32(0) if highDiff > 0 { denominator := float64(data.WeekHigh)*0.6 + float64(data.HighHabit)*0.2 + float64(weeklyActive)*0.2 changeHigh = int32(math.Round((float64(highDiff) / denominator) * 100)) } else { changeHigh = int32(math.Round(float64(highDiff) / float64(data.HighHabit) * 100)) } return changeHigh }