package mqtt import ( "fmt" "kpt-pasture/model" "kpt-pasture/module/crontab" "math" "time" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "go.uber.org/zap" pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow" "gitee.com/xuyiping_admin/pkg/xerr" ) const ( MinChangeFilter = -99 MinRuminaFilter = -99 MinChewFilter = -99 MinChangeHigh = -99 DefaultNb = 30 ) func (e *Entry) PastureUpdateActiveHabit() { pastureList := e.FindPastureList() if pastureList == nil || len(pastureList) == 0 { return } for _, pasture := range pastureList { if err := e.EntryUpdateActiveHabit(pasture.Id); err != nil { zaplog.Error("PastureUpdateActiveHabit", zap.Any("PastureUpdateActiveHabit", err), zap.Any("pasture", pasture)) } zaplog.Info(fmt.Sprintf("PastureUpdateActiveHabit Success %d", pasture.Id)) } } func (e *Entry) EntryUpdateActiveHabit(pastureId int64) (err error) { lastMaxHabitData, err := e.GetSystemConfigure2(pastureId, model.MaxHabit) if err != nil { return xerr.WithStack(err) } lastMaxHabitId := lastMaxHabitData.Value currentMaxHabit := &model.NeckActiveHabit{} if err = e.DB.Model(currentMaxHabit). Where("id > ?", lastMaxHabitId). Order("id desc").First(currentMaxHabit).Error; err != nil { return xerr.WithStack(err) } // 本次执行<=上次执行的id,则不执行 if currentMaxHabit.Id < lastMaxHabitId { return nil } // 统一更新is_max_time为0 if err = e.DB.Model(new(model.NeckActiveHabit)). Where("is_max_time = ?", pasturePb.IsShow_Ok). Where("heat_date > ?", time.Now().AddDate(0, 0, -10).Format(model.LayoutDate2)). Update("is_max_time", pasturePb.IsShow_No).Error; err != nil { return xerr.WithStack(err) } // 获取这段执行数据内最大日期和最小日期 xToday := &crontab.XToday{} if err = e.DB.Model(new(model.NeckActiveHabit)). Select(`MIN(heat_date) as x_beg_date, MAX(heat_date) as x_end_date`). Where("id BETWEEN ? AND ?", lastMaxHabitId, currentMaxHabit.Id). First(xToday).Error; err != nil { return xerr.WithStack(err) } xToday.LastMaxHabitId = lastMaxHabitId xToday.CurrMaxHabitId = currentMaxHabit.Id minHeatDateParse, err := time.Parse(model.LayoutDate2, xToday.XBegDate) if err != nil { return xerr.WithStack(err) } xBefore2Day := minHeatDateParse.AddDate(0, 0, -1).Format(model.LayoutDate2) xBefore7Day := minHeatDateParse.AddDate(0, 0, -7).Format(model.LayoutDate2) xMin2Id, err := e.GetMinIdByHeatDate(xBefore2Day, xToday.LastMaxHabitId) if err != nil { return xerr.WithStack(err) } xMin7Id, err := e.GetMinIdByHeatDate(xBefore7Day, xToday.LastMaxHabitId) if err != nil { return xerr.WithStack(err) } defer func() { // 更新最后一次执行的id值 if err == nil { e.DB.Model(new(model.SystemConfigure)). Where("name = ?", model.MaxHabit). Where("pasture_id = ?", pastureId). Update("value", currentMaxHabit.Id) } }() xToday.XMin2Id = xMin2Id xToday.XMin7Id = xMin7Id // id到上一次执行结果并且heat_date > 7天之前的最大牛只id置为is_max_time=1 maxHabitIdArray := make([]*model.MaxHabitIdModel, 0) if err = e.DB.Model(new(model.NeckActiveHabit)). Select("Max(id) as id"). Where("change_filter > ?", model.DefaultChangeFilter). Where("heat_date > ?", xBefore7Day). Group("neck_ring_number"). Find(&maxHabitIdArray).Error; err != nil { return xerr.WithStack(err) } if len(maxHabitIdArray) > 0 { maxHabitIds := make([]int64, 0) for _, v := range maxHabitIdArray { maxHabitIds = append(maxHabitIds, v.Id) } if err = e.DB.Model(new(model.NeckActiveHabit).TableName()). Where("id IN (?)", maxHabitIds). Update("is_max_time", pasturePb.IsShow_Ok).Error; err != nil { return xerr.WithStack(err) } } activeLowest, err := e.GetSystemConfigure2(pastureId, model.ActiveLowest) if err != nil { return xerr.WithStack(err) } ruminaLowest, err := e.GetSystemConfigure2(pastureId, model.RuminaLowest) if err != nil { return xerr.WithStack(err) } xToday.ActiveLowest = activeLowest.Value xToday.RuminaLowest = ruminaLowest.Value // 更新活动滤波 if err = e.FilterUpdate(pastureId, xToday); err != nil { zaplog.Error("EntryUpdateActiveHabit", zap.Any("FilterUpdate", err), zap.Any("xToday", xToday)) return xerr.WithStack(err) } // 更新周平均值 if err = e.WeeklyActiveAvgUpdate(pastureId, xToday); err != nil { zaplog.Error("EntryUpdateActiveHabit", zap.Any("WeeklyActiveAvgUpdate", err), zap.Any("xToday", xToday)) return xerr.WithStack(err) } if err = e.ActivityVolumeChanges(pastureId, xToday); err != nil { zaplog.Error("EntryUpdateActiveHabit", zap.Any("ActivityVolumeChanges", err), zap.Any("xToday", xToday)) return xerr.WithStack(err) } return nil } // FilterUpdate 更新活动滤波 func (e *Entry) FilterUpdate(pastureId int64, xToDay *crontab.XToday) error { newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0) if err := e.DB.Model(new(model.NeckActiveHabit)). Where("pasture_id = ?", pastureId). Where(e.DB.Where("change_filter = ?", model.InitChangeFilter).Or("is_max_time = ?", pasturePb.IsShow_Ok)). Where(e.DB.Where("high >= ?", xToDay.ActiveLowest).Or("rumina >= ?", xToDay.RuminaLowest)). Order("neck_ring_number,id"). Find(&newNeckActiveHabitList).Error; err != nil { return xerr.WithStack(err) } var filterValues = make(map[string]*model.NeckActiveHabit) // 活动量滤波 for _, v := range newNeckActiveHabitList { prev, ok := filterValues[v.NeckRingNumber] if !ok { if v.FilterHigh <= 0 { v.FilterHigh = v.High } if v.FilterRumina <= 0 { v.FilterRumina = v.Rumina } if v.FilterChew <= 0 { v.FilterChew = v.Rumina + v.Intake } filterValues[v.NeckRingNumber] = v continue } if v.FilterHigh <= 0 { v.FilterHigh = int32(computeIfPositiveElse(float64(v.High), float64(prev.FilterHigh), 0.23, 0.77)) } if v.FilterRumina <= 0 { v.FilterRumina = int32(computeIfPositiveElse(float64(v.Rumina), float64(prev.FilterRumina), 0.33, 0.67)) } if v.FilterChew <= 0 { v.FilterChew = int32(computeIfPositiveElse(float64(v.Rumina+v.Intake), float64(prev.FilterChew), 0.33, 0.67)) } // 更新过滤值 filterValues[v.NeckRingNumber] = v if err := e.DB.Model(new(model.NeckActiveHabit)). Select("filter_high", "filter_rumina", "filter_chew"). Where("id = ?", v.Id). Updates(v).Error; err != nil { return xerr.WithStack(err) } } zaplog.Info("EntryUpdateActiveHabit-FilterUpdate-Success") return nil } func (e *Entry) WeeklyActiveAvgUpdate(pastureId int64, xToday *crontab.XToday) error { beginDayDate, err := time.Parse(model.LayoutDate2, xToday.XBegDate) if err != nil { return xerr.WithStack(err) } before7DayDate := beginDayDate.AddDate(0, 0, -7).Format(model.LayoutDate2) before1DayDate := beginDayDate.AddDate(0, 0, -1).Format(model.LayoutDate2) weeklyActive, err := e.GetSystemConfigure2(pastureId, model.WeeklyActive) if err != nil { return xerr.WithStack(err) } xframeId := int64(0) maxXframeId := int64(11) xStartDate, _ := time.Parse(model.LayoutDate2, xToday.XBegDate) xEndDate, _ := time.Parse(model.LayoutDate2, xToday.XEndDate) for xStartDate.Format(model.LayoutDate2) < xEndDate.Format(model.LayoutDate2) || (xStartDate == xEndDate && xframeId <= maxXframeId) { // 时间点周平均 AvgHabitList := make([]*crontab.AvgHabit, 0) if err = e.DB.Model(new(model.NeckActiveHabit)). Select("neck_ring_number"). Select("IF(COUNT(1)>=3, ROUND((SUM(filter_high) -MIN(filter_high) -MAX(filter_high))/ABS(COUNT(1) -2),0), -1) as avg_high_habit"). Select("IF(COUNT(1)>=3, ROUND((SUM(filter_rumina) -MIN(filter_rumina) -MAX(filter_rumina))/ABS(COUNT(1) -2),0), -1) as avg_rumina_habit"). Select("IF(COUNT(1)>=3, ROUND((SUM(filter_chew) -MIN(filter_chew) -MAX(filter_chew))/ABS(COUNT(1) -2),0), -1) as avg_chew_habit"). Select("ROUND(AVG(intake),0) as avg_intake_habit"). Select("ROUND(AVG(inactive),0) as avg_inactive_habit"). Where("id BETWEEN ? AND ?", xToday.XMin7Id, xToday.CurrMaxHabitId). Where("heat_date BETWEEN ? AND ?", before7DayDate, before1DayDate). Where("frameid = ?", xframeId). Where("pasture_id = ?", pastureId). Where("change_filter = ?", model.InitChangeFilter). Where(e.DB.Where("high > ?", xToday.ActiveLowest).Or("rumina > ?", xToday.RuminaLowest)). Group("neck_ring_number"). Find(&AvgHabitList).Error; err != nil { return xerr.WithStack(err) } for _, v := range AvgHabitList { if err = e.DB.Model(new(model.NeckActiveHabit)). Select("avg_high_habit", "avg_rumina_habit", "avg_chew_habit", "avg_intake_habit", "avg_inactive_habit"). Where("neck_ring_number = ?", v.NeckRingNumber). Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId). Where("frameid = ?", xframeId). Where("change_filter = ?", model.InitChangeFilter). Where("heat_date = ?", xStartDate). Updates(v).Error; err != nil { return xerr.WithStack(err) } } // 累计24小时数值 sumHabitList := make([]*crontab.SumHabit, 0) if err = e.DB.Model(new(model.NeckActiveHabit)). Select("neck_ring_number"). Select("IF(COUNT(1)>6, ROUND(AVG( h2.filter_rumina)*12,0), 0) as sum_rumina"). Select("IF(COUNT(1)>6, ROUND(AVG( h2.intake)*12,0), 0) as sum_intake"). Select("IF(COUNT(1)>6, ROUND(AVG( h2.inactive)*12,0), 0) as sum_inactive"). Select("IF(COUNT(1)>6, ROUND(AVG( h2.active)*12,0), 0) as sum_active"). Select("MAX(h2.change_filter) as sum_max_high"). Select("MIN(IF(change_filter > ?, change_filter, 0)) as sum_min_high", MinChangeFilter). Select("MIN( CASE WHEN filter_chew > ? THEN filter_chew WHEN filter_rumina >= ? THEN filter_rumina ELSE 0 END) as sum_min_chew", MinChangeFilter, MinRuminaFilter). Where("id BETWEEN ? AND ?", xToday.XMin2Id, xToday.CurrMaxHabitId). Where("pasture_id = ?", pastureId). Where("heat_date BETWEEN ? AND ?", xStartDate.AddDate(0, 0, -1).Format(model.LayoutDate2), xStartDate.Format(model.LayoutDate2)). Where("created_at BETWEEN ? AND ?", xStartDate.Add(-23*time.Hour).Unix(), xStartDate.Unix()). Where(e.DB.Where("high > ?", xToday.ActiveLowest).Or("rumina >= ?", xToday.RuminaLowest)). Group("neck_ring_number"). Find(&sumHabitList).Error; err != nil { return xerr.WithStack(err) } for _, v := range sumHabitList { if err = e.DB.Model(new(model.NeckActiveHabit)). Select("sum_rumina", "sum_intake", "sum_inactive", "sum_active", "sum_max_high", "sum_min_high", "sum_min_chew"). Where("neck_ring_number = ?", v.NeckRingNumber). Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId). Where("heat_date = ?", xStartDate.Format(model.LayoutDate2)). Where("frameid = ?", xframeId). Where("change_filter = ?", model.InitChangeFilter). Updates(v).Error; err != nil { return xerr.WithStack(err) } } // 变化百分比 changeHabitList := make([]*model.NeckActiveHabit, 0) if err = e.DB.Model(new(model.NeckActiveHabit)). Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId). Where("heat_date = ?", xStartDate.Format(model.LayoutDate2)). Where("frameid = ?", xframeId). Where("pasture_id = ?", pastureId). Where("change_filter = ?", model.InitChangeFilter). Where("avg_high_habit > ?", 0). Where(e.DB.Where("high > ?", xToday.ActiveLowest).Or("rumina >= ?", xToday.RuminaLowest)). Find(&changeHabitList).Error; err != nil { return xerr.WithStack(err) } for _, v := range changeHabitList { if v.FilterHigh-v.AvgHighHabit > 0 { v.ChangeHigh = (v.FilterHigh - v.AvgHighHabit) / int32(float64(v.WeekHigh)*0.6+float64(v.AvgHighHabit)*0.2+float64(weeklyActive.Value)*0.2) } else { v.ChangeHigh = v.FilterHigh - v.AvgHighHabit/v.AvgHighHabit*100 } v.ChangeRumina = v.RuminaFilter - v.AvgRuminaHabit/v.AvgHighHabit*100 v.ChangeChew = v.FilterChew - v.AvgChewHabit/v.AvgHighHabit*100 if err = e.DB.Model(new(model.NeckActiveHabit)). Select("change_high", "change_rumina", "change_chew"). Where("id = ?", v.Id). Updates(v).Error; err != nil { return xerr.WithStack(err) } } if xframeId == maxXframeId { xframeId = 0 xStartDate = xStartDate.AddDate(0, 0, 1) } else { xframeId++ } } zaplog.Info("EntryUpdateActiveHabit-WeeklyActiveAvgUpdate-Success") return nil } // UpdateChangeFilter 变化趋势滤波 func (e *Entry) UpdateChangeFilter(pastureId int64, xToday *crontab.XToday) (err error) { xRuminaDisc, err := e.GetSystemConfigure2(pastureId, model.XRuminaDisc) if err != nil { return err } xChangeDiscount, err := e.GetSystemConfigure2(pastureId, model.XChangeDiscount) if err != nil { return err } newChangeFilterList := make([]*crontab.ChangeFilterData, 0) if err = e.DB.Model(new(model.NeckActiveHabit)). Select("id,neck_ring_number,change_high,change_filter,rumina_filter,change_rumina,chew_filter,change_chew"). Select("IF(lact=0,0.8,1) as xlc_dis_count"). Where("id BETWEEN ? AND ?", xToday.XMin2Id, xToday.CurrMaxHabitId). Where("pasture_id = ?", pastureId). Where(e.DB.Where("change_filter = ?", model.InitChangeFilter).Or("is_max_time = ?", pasturePb.IsShow_Ok)). Where("change_high > ?", MinChangeHigh). Order("neck_ring_number,heat_date,frameid"). Find(&newChangeFilterList).Error; err != nil { return xerr.WithStack(err) } var filterValues = make(map[string]*crontab.ChangeFilterData) for _, v := range newChangeFilterList { prev, ok := filterValues[v.NeckRingNumber] if v.ChangeFilter <= MinChangeFilter { prefChangeFilter := int32(0) if ok { prefChangeFilter = prev.ChangeFilter } leastValue := v.HighChange if prefChangeFilter < v.HighChange { leastValue = prefChangeFilter } v.ChangeFilter = int32(float64(prefChangeFilter)*(1-(float64(xChangeDiscount.Value)/10)*v.XlcDisCount) + float64(leastValue)*(float64(xChangeDiscount.Value)/10)*v.XlcDisCount) } if v.RuminaFilter <= MinChangeFilter { prefRuminaFilter := int32(0) if ok { prefRuminaFilter = prev.RuminaFilter } factor := float64(1) if math.Abs(float64(v.ChangeRumina)) > 60 { factor = 0.5 } v.RuminaFilter = int32(float64(prefRuminaFilter)*(1-float64(xRuminaDisc.Value/10)*v.XlcDisCount*factor) + float64(v.ChangeRumina)*float64(xRuminaDisc.Value)/10*v.XlcDisCount*factor) } if v.RuminaFilter > 50 { v.RuminaFilter = 50 } if v.ChewFilter <= MinChangeFilter { prefChewFilter := int32(0) if ok { prefChewFilter = prev.ChewFilter } factor := float64(1) if math.Abs(float64(v.ChangeChew)) > 60 { factor = 0.5 } v.ChewFilter = int32(float64(prefChewFilter)*(1-float64(xRuminaDisc.Value)/10*factor) + float64(v.ChangeChew)*float64(xRuminaDisc.Value)/10*factor) } if v.ChewFilter > 50 { v.ChangeChew = 50 } if err = e.DB.Model(new(model.NeckActiveHabit)). Select("change_filter", "rumina_filter", "chew_filter"). Where("id = ?", v.Id). Where("neck_ring_number = ?", v.NeckRingNumber). Where("change_filter = ?", model.InitChangeFilter). Updates(v).Error; err != nil { return xerr.WithStack(err) } filterValues[v.NeckRingNumber] = v } return nil } // ActivityVolumeChanges 计算活动量变化趋势校正值(活跃度校正) func (e *Entry) ActivityVolumeChanges(pastureId int64, xToday *crontab.XToday) error { currDate, _ := time.Parse(model.LayoutDate2, xToday.XBegDate) XEndDateTime, _ := time.Parse(model.LayoutDate2, xToday.XEndDate) xframeId := int64(0) maxXframeId := int64(11) dayTimes := int64(1) for currDate.Format(model.LayoutDate2) < XEndDateTime.Format(model.LayoutDate2) || (currDate == XEndDateTime && xframeId <= maxXframeId) { activityVolumeList := make([]*crontab.ActivityVolume, 0) activeTime := fmt.Sprintf("%s %02d:00:00", currDate.Format(model.LayoutDate2), xframeId*2) activeTimeParse, err := time.Parse(model.LayoutTime, activeTime) if err != nil { return xerr.WithStack(err) } if dayTimes == 1 { if err = e.DB.Model(new(model.NeckActiveHabit)). Select("neck_ring_number"). Select("AVG(IF(change_filter>=60, 60, change_filter)) as avg_filter"). Select("ROUND(STD(IF(change_filter>=60, 60, change_filter))) as std_filter"). Select("COUNT(1) as nb"). Where("id BETWEEN ? AND ?", xToday.XMin7Id, xToday.CurrMaxHabitId). Where("heat_date BETWEEN ? AND ?", currDate.AddDate(0, 0, -7).Format(model.LayoutDate2), currDate.AddDate(0, 0, -1).Format(model.LayoutDate2)). Where("frameid = ?", xframeId). Where("pasture_id = ?", pastureId). Where(e.DB.Where("high > ?", xToday.ActiveLowest).Or("rumina >= ?", xToday.RuminaLowest)). Where("active_time <= ?", activeTimeParse.Add(-12*time.Hour)). Where("change_filter > ?", MinChangeFilter). Having("nb > ?", DefaultNb). Group("neck_ring_number"). Find(&activityVolumeList).Error; err != nil { return xerr.WithStack(err) } } for _, v := range activityVolumeList { filterCorrect := model.DefaultFilterCorrect - int(math.Floor(float64(v.AvgFilter)/3+float64(v.StdFilter)/2)) if err = e.DB.Model(new(model.NeckActiveHabit)). Where("neck_ring_number = ?", v.NeckRingNumber). Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId). Where("frameid = ?", xframeId). Where("head_date = ?", currDate.Format(model.LayoutDate2)). Update("filter_correct", filterCorrect).Error; err != nil { return xerr.WithStack(err) } } /*n := 0 if n <= 10 { // todo n += 2 }*/ zaplog.Info("ActivityVolumeChanges", zap.Any("xToday", xToday), zap.Any("currDate", currDate.Format(model.LayoutDate2)), zap.Any("xframeId", xframeId), zap.Any("activityVolumeList", activityVolumeList), ) if err = e.DB.Model(new(model.NeckActiveHabit)). Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId). Where("heat_date = ?", currDate.Format(model.LayoutDate2)). Where("frameid = ?", xframeId). Where("change_filter = ?", model.InitChangeFilter). Updates(map[string]interface{}{ "change_filter": MinChangeFilter, "rumina_filter": MinRuminaFilter, "chew_filter": MinChewFilter, "filter_correct": model.DefaultFilterCorrect, }).Error; err != nil { return xerr.WithStack(err) } if xframeId == maxXframeId { xframeId = 0 currDate = currDate.AddDate(0, 0, 1) dayTimes = 1 } else { xframeId++ dayTimes++ } /*// 更新评分 newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0) if err = e.DB.Model(new(model.NeckActiveHabit)). Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId). Where("heat_date = ?", currDate.Format(model.LayoutDate2)). Where("frameid = ?", xframeId). Where("score = ?", 0). Find(&newNeckActiveHabitList).Error; err != nil { return xerr.WithStack(err) }*/ // todo 待开发 } zaplog.Info("EntryUpdateActiveHabit-ActivityVolumeChanges-Success") return nil } // 辅助函数来计算过滤值 func computeIfPositiveElse(newValue, prevFilterValue float64, weightPrev, weightNew float64) float64 { return math.Ceil((prevFilterValue * weightPrev) + (weightNew * newValue)) }