|
- package mqtt
- import (
- "fmt"
- "kpt-pasture/model"
- "kpt-pasture/module/crontab"
- "math"
- "time"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- "go.uber.org/zap"
- pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
- "gitee.com/xuyiping_admin/pkg/xerr"
- )
- const (
- MinChangeFilter = -99
- MinRuminaFilter = -99
- MinChewFilter = -99
- MinChangeHigh = -99
- DefaultNb = 30
- )
- func (e *Entry) PastureUpdateActiveHabit() {
- pastureList := e.FindPastureList()
- if pastureList == nil || len(pastureList) == 0 {
- return
- }
- for _, pasture := range pastureList {
- if err := e.EntryUpdateActiveHabit(pasture.Id); err != nil {
- zaplog.Error("PastureUpdateActiveHabit", zap.Any("PastureUpdateActiveHabit", err), zap.Any("pasture", pasture))
- }
- zaplog.Info(fmt.Sprintf("PastureUpdateActiveHabit Success %d", pasture.Id))
- }
- }
- func (e *Entry) EntryUpdateActiveHabit(pastureId int64) (err error) {
- lastMaxHabitData, err := e.GetSystemConfigure2(pastureId, model.MaxHabit)
- if err != nil {
- return xerr.WithStack(err)
- }
- lastMaxHabitId := lastMaxHabitData.Value
- currentMaxHabit := &model.NeckActiveHabit{}
- if err = e.DB.Model(currentMaxHabit).
- Where("id > ?", lastMaxHabitId).
- Order("id desc").First(currentMaxHabit).Error; err != nil {
- return xerr.WithStack(err)
- }
- // 本次执行<=上次执行的id,则不执行
- if currentMaxHabit.Id < lastMaxHabitId {
- return nil
- }
- // 统一更新is_max_time为0
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Where("is_max_time = ?", pasturePb.IsShow_Ok).
- Where("heat_date > ?", time.Now().AddDate(0, 0, -10).Format(model.LayoutDate2)).
- Update("is_max_time", pasturePb.IsShow_No).Error; err != nil {
- return xerr.WithStack(err)
- }
- // 获取这段执行数据内最大日期和最小日期
- xToday := &crontab.XToday{}
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Select(`MIN(heat_date) as x_beg_date, MAX(heat_date) as x_end_date`).
- Where("id BETWEEN ? AND ?", lastMaxHabitId, currentMaxHabit.Id).
- First(xToday).Error; err != nil {
- return xerr.WithStack(err)
- }
- xToday.LastMaxHabitId = lastMaxHabitId
- xToday.CurrMaxHabitId = currentMaxHabit.Id
- minHeatDateParse, err := time.Parse(model.LayoutDate2, xToday.XBegDate)
- if err != nil {
- return xerr.WithStack(err)
- }
- xBefore2Day := minHeatDateParse.AddDate(0, 0, -1).Format(model.LayoutDate2)
- xBefore7Day := minHeatDateParse.AddDate(0, 0, -7).Format(model.LayoutDate2)
- xMin2Id, err := e.GetMinIdByHeatDate(xBefore2Day, xToday.LastMaxHabitId)
- if err != nil {
- return xerr.WithStack(err)
- }
- xMin7Id, err := e.GetMinIdByHeatDate(xBefore7Day, xToday.LastMaxHabitId)
- if err != nil {
- return xerr.WithStack(err)
- }
- defer func() {
- // 更新最后一次执行的id值
- if err == nil {
- e.DB.Model(new(model.SystemConfigure)).
- Where("name = ?", model.MaxHabit).
- Where("pasture_id = ?", pastureId).
- Update("value", currentMaxHabit.Id)
- }
- }()
- xToday.XMin2Id = xMin2Id
- xToday.XMin7Id = xMin7Id
- // id到上一次执行结果并且heat_date > 7天之前的最大牛只id置为is_max_time=1
- maxHabitIdArray := make([]*model.MaxHabitIdModel, 0)
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Select("Max(id) as id").
- Where("change_filter > ?", model.DefaultChangeFilter).
- Where("heat_date > ?", xBefore7Day).
- Group("neck_ring_number").
- Find(&maxHabitIdArray).Error; err != nil {
- return xerr.WithStack(err)
- }
- if len(maxHabitIdArray) > 0 {
- maxHabitIds := make([]int64, 0)
- for _, v := range maxHabitIdArray {
- maxHabitIds = append(maxHabitIds, v.Id)
- }
- if err = e.DB.Model(new(model.NeckActiveHabit).TableName()).
- Where("id IN (?)", maxHabitIds).
- Update("is_max_time", pasturePb.IsShow_Ok).Error; err != nil {
- return xerr.WithStack(err)
- }
- }
- activeLowest, err := e.GetSystemConfigure2(pastureId, model.ActiveLowest)
- if err != nil {
- return xerr.WithStack(err)
- }
- ruminaLowest, err := e.GetSystemConfigure2(pastureId, model.RuminaLowest)
- if err != nil {
- return xerr.WithStack(err)
- }
- xToday.ActiveLowest = activeLowest.Value
- xToday.RuminaLowest = ruminaLowest.Value
- // 更新活动滤波
- if err = e.FilterUpdate(pastureId, xToday); err != nil {
- zaplog.Error("EntryUpdateActiveHabit", zap.Any("FilterUpdate", err), zap.Any("xToday", xToday))
- return xerr.WithStack(err)
- }
- // 更新周平均值
- if err = e.WeeklyActiveAvgUpdate(pastureId, xToday); err != nil {
- zaplog.Error("EntryUpdateActiveHabit", zap.Any("WeeklyActiveAvgUpdate", err), zap.Any("xToday", xToday))
- return xerr.WithStack(err)
- }
- if err = e.ActivityVolumeChanges(pastureId, xToday); err != nil {
- zaplog.Error("EntryUpdateActiveHabit", zap.Any("ActivityVolumeChanges", err), zap.Any("xToday", xToday))
- return xerr.WithStack(err)
- }
- return nil
- }
- // FilterUpdate 更新活动滤波
- func (e *Entry) FilterUpdate(pastureId int64, xToDay *crontab.XToday) error {
- newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0)
- if err := e.DB.Model(new(model.NeckActiveHabit)).
- Where("pasture_id = ?", pastureId).
- Where(e.DB.Where("change_filter = ?", model.InitChangeFilter).Or("is_max_time = ?", pasturePb.IsShow_Ok)).
- Where(e.DB.Where("high >= ?", xToDay.ActiveLowest).Or("rumina >= ?", xToDay.RuminaLowest)).
- Order("neck_ring_number,id").
- Find(&newNeckActiveHabitList).Error; err != nil {
- return xerr.WithStack(err)
- }
- var filterValues = make(map[string]*model.NeckActiveHabit)
- // 活动量滤波
- for _, v := range newNeckActiveHabitList {
- prev, ok := filterValues[v.NeckRingNumber]
- if !ok {
- if v.FilterHigh <= 0 {
- v.FilterHigh = v.High
- }
- if v.FilterRumina <= 0 {
- v.FilterRumina = v.Rumina
- }
- if v.FilterChew <= 0 {
- v.FilterChew = v.Rumina + v.Intake
- }
- filterValues[v.NeckRingNumber] = v
- continue
- }
- if v.FilterHigh <= 0 {
- v.FilterHigh = int32(computeIfPositiveElse(float64(v.High), float64(prev.FilterHigh), 0.23, 0.77))
- }
- if v.FilterRumina <= 0 {
- v.FilterRumina = int32(computeIfPositiveElse(float64(v.Rumina), float64(prev.FilterRumina), 0.33, 0.67))
- }
- if v.FilterChew <= 0 {
- v.FilterChew = int32(computeIfPositiveElse(float64(v.Rumina+v.Intake), float64(prev.FilterChew), 0.33, 0.67))
- }
- // 更新过滤值
- filterValues[v.NeckRingNumber] = v
- if err := e.DB.Model(new(model.NeckActiveHabit)).
- Select("filter_high", "filter_rumina", "filter_chew").
- Where("id = ?", v.Id).
- Updates(v).Error; err != nil {
- return xerr.WithStack(err)
- }
- }
- zaplog.Info("EntryUpdateActiveHabit-FilterUpdate-Success")
- return nil
- }
- func (e *Entry) WeeklyActiveAvgUpdate(pastureId int64, xToday *crontab.XToday) error {
- beginDayDate, err := time.Parse(model.LayoutDate2, xToday.XBegDate)
- if err != nil {
- return xerr.WithStack(err)
- }
- before7DayDate := beginDayDate.AddDate(0, 0, -7).Format(model.LayoutDate2)
- before1DayDate := beginDayDate.AddDate(0, 0, -1).Format(model.LayoutDate2)
- weeklyActive, err := e.GetSystemConfigure2(pastureId, model.WeeklyActive)
- if err != nil {
- return xerr.WithStack(err)
- }
- xframeId := int64(0)
- maxXframeId := int64(11)
- xStartDate, _ := time.Parse(model.LayoutDate2, xToday.XBegDate)
- xEndDate, _ := time.Parse(model.LayoutDate2, xToday.XEndDate)
- for xStartDate.Format(model.LayoutDate2) < xEndDate.Format(model.LayoutDate2) || (xStartDate == xEndDate && xframeId <= maxXframeId) {
- // 时间点周平均
- AvgHabitList := make([]*crontab.AvgHabit, 0)
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Select("neck_ring_number").
- Select("IF(COUNT(1)>=3, ROUND((SUM(filter_high) -MIN(filter_high) -MAX(filter_high))/ABS(COUNT(1) -2),0), -1) as avg_high_habit").
- Select("IF(COUNT(1)>=3, ROUND((SUM(filter_rumina) -MIN(filter_rumina) -MAX(filter_rumina))/ABS(COUNT(1) -2),0), -1) as avg_rumina_habit").
- Select("IF(COUNT(1)>=3, ROUND((SUM(filter_chew) -MIN(filter_chew) -MAX(filter_chew))/ABS(COUNT(1) -2),0), -1) as avg_chew_habit").
- Select("ROUND(AVG(intake),0) as avg_intake_habit").
- Select("ROUND(AVG(inactive),0) as avg_inactive_habit").
- Where("id BETWEEN ? AND ?", xToday.XMin7Id, xToday.CurrMaxHabitId).
- Where("heat_date BETWEEN ? AND ?", before7DayDate, before1DayDate).
- Where("frameid = ?", xframeId).
- Where("pasture_id = ?", pastureId).
- Where("change_filter = ?", model.InitChangeFilter).
- Where(e.DB.Where("high > ?", xToday.ActiveLowest).Or("rumina > ?", xToday.RuminaLowest)).
- Group("neck_ring_number").
- Find(&AvgHabitList).Error; err != nil {
- return xerr.WithStack(err)
- }
- for _, v := range AvgHabitList {
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Select("avg_high_habit", "avg_rumina_habit", "avg_chew_habit", "avg_intake_habit", "avg_inactive_habit").
- Where("neck_ring_number = ?", v.NeckRingNumber).
- Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
- Where("frameid = ?", xframeId).
- Where("change_filter = ?", model.InitChangeFilter).
- Where("heat_date = ?", xStartDate).
- Updates(v).Error; err != nil {
- return xerr.WithStack(err)
- }
- }
- // 累计24小时数值
- sumHabitList := make([]*crontab.SumHabit, 0)
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Select("neck_ring_number").
- Select("IF(COUNT(1)>6, ROUND(AVG( h2.filter_rumina)*12,0), 0) as sum_rumina").
- Select("IF(COUNT(1)>6, ROUND(AVG( h2.intake)*12,0), 0) as sum_intake").
- Select("IF(COUNT(1)>6, ROUND(AVG( h2.inactive)*12,0), 0) as sum_inactive").
- Select("IF(COUNT(1)>6, ROUND(AVG( h2.active)*12,0), 0) as sum_active").
- Select("MAX(h2.change_filter) as sum_max_high").
- Select("MIN(IF(change_filter > ?, change_filter, 0)) as sum_min_high", MinChangeFilter).
- Select("MIN( CASE WHEN filter_chew > ? THEN filter_chew WHEN filter_rumina >= ? THEN filter_rumina ELSE 0 END) as sum_min_chew", MinChangeFilter, MinRuminaFilter).
- Where("id BETWEEN ? AND ?", xToday.XMin2Id, xToday.CurrMaxHabitId).
- Where("pasture_id = ?", pastureId).
- Where("heat_date BETWEEN ? AND ?", xStartDate.AddDate(0, 0, -1).Format(model.LayoutDate2), xStartDate.Format(model.LayoutDate2)).
- Where("created_at BETWEEN ? AND ?", xStartDate.Add(-23*time.Hour).Unix(), xStartDate.Unix()).
- Where(e.DB.Where("high > ?", xToday.ActiveLowest).Or("rumina >= ?", xToday.RuminaLowest)).
- Group("neck_ring_number").
- Find(&sumHabitList).Error; err != nil {
- return xerr.WithStack(err)
- }
- for _, v := range sumHabitList {
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Select("sum_rumina", "sum_intake", "sum_inactive", "sum_active", "sum_max_high", "sum_min_high", "sum_min_chew").
- Where("neck_ring_number = ?", v.NeckRingNumber).
- Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
- Where("heat_date = ?", xStartDate.Format(model.LayoutDate2)).
- Where("frameid = ?", xframeId).
- Where("change_filter = ?", model.InitChangeFilter).
- Updates(v).Error; err != nil {
- return xerr.WithStack(err)
- }
- }
- // 变化百分比
- changeHabitList := make([]*model.NeckActiveHabit, 0)
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
- Where("heat_date = ?", xStartDate.Format(model.LayoutDate2)).
- Where("frameid = ?", xframeId).
- Where("pasture_id = ?", pastureId).
- Where("change_filter = ?", model.InitChangeFilter).
- Where("avg_high_habit > ?", 0).
- Where(e.DB.Where("high > ?", xToday.ActiveLowest).Or("rumina >= ?", xToday.RuminaLowest)).
- Find(&changeHabitList).Error; err != nil {
- return xerr.WithStack(err)
- }
- for _, v := range changeHabitList {
- if v.FilterHigh-v.AvgHighHabit > 0 {
- v.ChangeHigh = (v.FilterHigh - v.AvgHighHabit) / int32(float64(v.WeekHigh)*0.6+float64(v.AvgHighHabit)*0.2+float64(weeklyActive.Value)*0.2)
- } else {
- v.ChangeHigh = v.FilterHigh - v.AvgHighHabit/v.AvgHighHabit*100
- }
- v.ChangeRumina = v.RuminaFilter - v.AvgRuminaHabit/v.AvgHighHabit*100
- v.ChangeChew = v.FilterChew - v.AvgChewHabit/v.AvgHighHabit*100
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Select("change_high", "change_rumina", "change_chew").
- Where("id = ?", v.Id).
- Updates(v).Error; err != nil {
- return xerr.WithStack(err)
- }
- }
- if xframeId == maxXframeId {
- xframeId = 0
- xStartDate = xStartDate.AddDate(0, 0, 1)
- } else {
- xframeId++
- }
- }
- zaplog.Info("EntryUpdateActiveHabit-WeeklyActiveAvgUpdate-Success")
- return nil
- }
- // UpdateChangeFilter 变化趋势滤波
- func (e *Entry) UpdateChangeFilter(pastureId int64, xToday *crontab.XToday) (err error) {
- xRuminaDisc, err := e.GetSystemConfigure2(pastureId, model.XRuminaDisc)
- if err != nil {
- return err
- }
- xChangeDiscount, err := e.GetSystemConfigure2(pastureId, model.XChangeDiscount)
- if err != nil {
- return err
- }
- newChangeFilterList := make([]*crontab.ChangeFilterData, 0)
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Select("id,neck_ring_number,change_high,change_filter,rumina_filter,change_rumina,chew_filter,change_chew").
- Select("IF(lact=0,0.8,1) as xlc_dis_count").
- Where("id BETWEEN ? AND ?", xToday.XMin2Id, xToday.CurrMaxHabitId).
- Where("pasture_id = ?", pastureId).
- Where(e.DB.Where("change_filter = ?", model.InitChangeFilter).Or("is_max_time = ?", pasturePb.IsShow_Ok)).
- Where("change_high > ?", MinChangeHigh).
- Order("neck_ring_number,heat_date,frameid").
- Find(&newChangeFilterList).Error; err != nil {
- return xerr.WithStack(err)
- }
- var filterValues = make(map[string]*crontab.ChangeFilterData)
- for _, v := range newChangeFilterList {
- prev, ok := filterValues[v.NeckRingNumber]
- if v.ChangeFilter <= MinChangeFilter {
- prefChangeFilter := int32(0)
- if ok {
- prefChangeFilter = prev.ChangeFilter
- }
- leastValue := v.HighChange
- if prefChangeFilter < v.HighChange {
- leastValue = prefChangeFilter
- }
- v.ChangeFilter = int32(float64(prefChangeFilter)*(1-(float64(xChangeDiscount.Value)/10)*v.XlcDisCount) +
- float64(leastValue)*(float64(xChangeDiscount.Value)/10)*v.XlcDisCount)
- }
- if v.RuminaFilter <= MinChangeFilter {
- prefRuminaFilter := int32(0)
- if ok {
- prefRuminaFilter = prev.RuminaFilter
- }
- factor := float64(1)
- if math.Abs(float64(v.ChangeRumina)) > 60 {
- factor = 0.5
- }
- v.RuminaFilter = int32(float64(prefRuminaFilter)*(1-float64(xRuminaDisc.Value/10)*v.XlcDisCount*factor) +
- float64(v.ChangeRumina)*float64(xRuminaDisc.Value)/10*v.XlcDisCount*factor)
- }
- if v.RuminaFilter > 50 {
- v.RuminaFilter = 50
- }
- if v.ChewFilter <= MinChangeFilter {
- prefChewFilter := int32(0)
- if ok {
- prefChewFilter = prev.ChewFilter
- }
- factor := float64(1)
- if math.Abs(float64(v.ChangeChew)) > 60 {
- factor = 0.5
- }
- v.ChewFilter = int32(float64(prefChewFilter)*(1-float64(xRuminaDisc.Value)/10*factor) +
- float64(v.ChangeChew)*float64(xRuminaDisc.Value)/10*factor)
- }
- if v.ChewFilter > 50 {
- v.ChangeChew = 50
- }
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Select("change_filter", "rumina_filter", "chew_filter").
- Where("id = ?", v.Id).
- Where("neck_ring_number = ?", v.NeckRingNumber).
- Where("change_filter = ?", model.InitChangeFilter).
- Updates(v).Error; err != nil {
- return xerr.WithStack(err)
- }
- filterValues[v.NeckRingNumber] = v
- }
- return nil
- }
- // ActivityVolumeChanges 计算活动量变化趋势校正值(活跃度校正)
- func (e *Entry) ActivityVolumeChanges(pastureId int64, xToday *crontab.XToday) error {
- currDate, _ := time.Parse(model.LayoutDate2, xToday.XBegDate)
- XEndDateTime, _ := time.Parse(model.LayoutDate2, xToday.XEndDate)
- xframeId := int64(0)
- maxXframeId := int64(11)
- dayTimes := int64(1)
- for currDate.Format(model.LayoutDate2) < XEndDateTime.Format(model.LayoutDate2) || (currDate == XEndDateTime && xframeId <= maxXframeId) {
- activityVolumeList := make([]*crontab.ActivityVolume, 0)
- activeTime := fmt.Sprintf("%s %02d:00:00", currDate.Format(model.LayoutDate2), xframeId*2)
- activeTimeParse, err := time.Parse(model.LayoutTime, activeTime)
- if err != nil {
- return xerr.WithStack(err)
- }
- if dayTimes == 1 {
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Select("neck_ring_number").
- Select("AVG(IF(change_filter>=60, 60, change_filter)) as avg_filter").
- Select("ROUND(STD(IF(change_filter>=60, 60, change_filter))) as std_filter").
- Select("COUNT(1) as nb").
- Where("id BETWEEN ? AND ?", xToday.XMin7Id, xToday.CurrMaxHabitId).
- Where("heat_date BETWEEN ? AND ?", currDate.AddDate(0, 0, -7).Format(model.LayoutDate2), currDate.AddDate(0, 0, -1).Format(model.LayoutDate2)).
- Where("frameid = ?", xframeId).
- Where("pasture_id = ?", pastureId).
- Where(e.DB.Where("high > ?", xToday.ActiveLowest).Or("rumina >= ?", xToday.RuminaLowest)).
- Where("active_time <= ?", activeTimeParse.Add(-12*time.Hour)).
- Where("change_filter > ?", MinChangeFilter).
- Having("nb > ?", DefaultNb).
- Group("neck_ring_number").
- Find(&activityVolumeList).Error; err != nil {
- return xerr.WithStack(err)
- }
- }
- for _, v := range activityVolumeList {
- filterCorrect := model.DefaultFilterCorrect - int(math.Floor(float64(v.AvgFilter)/3+float64(v.StdFilter)/2))
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Where("neck_ring_number = ?", v.NeckRingNumber).
- Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
- Where("frameid = ?", xframeId).
- Where("head_date = ?", currDate.Format(model.LayoutDate2)).
- Update("filter_correct", filterCorrect).Error; err != nil {
- return xerr.WithStack(err)
- }
- }
- /*n := 0
- if n <= 10 {
- // todo
- n += 2
- }*/
- zaplog.Info("ActivityVolumeChanges",
- zap.Any("xToday", xToday),
- zap.Any("currDate", currDate.Format(model.LayoutDate2)),
- zap.Any("xframeId", xframeId),
- zap.Any("activityVolumeList", activityVolumeList),
- )
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
- Where("heat_date = ?", currDate.Format(model.LayoutDate2)).
- Where("frameid = ?", xframeId).
- Where("change_filter = ?", model.InitChangeFilter).
- Updates(map[string]interface{}{
- "change_filter": MinChangeFilter,
- "rumina_filter": MinRuminaFilter,
- "chew_filter": MinChewFilter,
- "filter_correct": model.DefaultFilterCorrect,
- }).Error; err != nil {
- return xerr.WithStack(err)
- }
- if xframeId == maxXframeId {
- xframeId = 0
- currDate = currDate.AddDate(0, 0, 1)
- dayTimes = 1
- } else {
- xframeId++
- dayTimes++
- }
- /*// 更新评分
- newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0)
- if err = e.DB.Model(new(model.NeckActiveHabit)).
- Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
- Where("heat_date = ?", currDate.Format(model.LayoutDate2)).
- Where("frameid = ?", xframeId).
- Where("score = ?", 0).
- Find(&newNeckActiveHabitList).Error; err != nil {
- return xerr.WithStack(err)
- }*/
- // todo 待开发
- }
- zaplog.Info("EntryUpdateActiveHabit-ActivityVolumeChanges-Success")
- return nil
- }
- // 辅助函数来计算过滤值
- func computeIfPositiveElse(newValue, prevFilterValue float64, weightPrev, weightNew float64) float64 {
- return math.Ceil((prevFilterValue * weightPrev) + (weightNew * newValue))
- }
|