|
@@ -5,6 +5,7 @@ import (
|
|
|
"kpt-pasture/model"
|
|
|
"kpt-pasture/util"
|
|
|
"math"
|
|
|
+ "sort"
|
|
|
"time"
|
|
|
|
|
|
pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
|
|
@@ -23,36 +24,21 @@ const (
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- defaultLimit = int32(1000)
|
|
|
- isDelete bool
|
|
|
+ defaultLimit = int32(1000)
|
|
|
+ isDelete bool
|
|
|
+ mergeIsRunning bool
|
|
|
+ calculateIsRunning bool
|
|
|
)
|
|
|
|
|
|
// NeckRingOriginalMergeData 把脖环数据合并成2个小时的
|
|
|
-func (e *Entry) NeckRingOriginalMergeData() error {
|
|
|
- var (
|
|
|
- err error
|
|
|
- )
|
|
|
- limit := e.Cfg.NeckRingLimit
|
|
|
- if limit <= 0 {
|
|
|
- limit = defaultLimit
|
|
|
- }
|
|
|
- newTime := time.Now()
|
|
|
- //createdAt := newTime.Add(-1 * time.Hour)
|
|
|
-
|
|
|
- neckRingList := make([]*model.NeckRingOriginal, 0)
|
|
|
- if err = e.DB.Model(new(model.NeckRingOriginal)).
|
|
|
- Where("is_show = ?", pasturePb.IsShow_No).
|
|
|
- Where("neck_ring_number IN (?)", "10027").
|
|
|
- //Where("created_at <= ?", createdAt.Unix()).
|
|
|
- Order("active_date,neck_ring_number,frameid").
|
|
|
- Limit(int(limit)).Find(&neckRingList).Error; err != nil {
|
|
|
- return xerr.WithStack(err)
|
|
|
- }
|
|
|
- if len(neckRingList) <= 0 {
|
|
|
+func (e *Entry) NeckRingOriginalMergeData() (err error) {
|
|
|
+ if mergeIsRunning {
|
|
|
return nil
|
|
|
}
|
|
|
-
|
|
|
+ newTime := time.Now()
|
|
|
+ mergeIsRunning = true
|
|
|
defer func() {
|
|
|
+ mergeIsRunning = false
|
|
|
if newTime.Day()%15 == 0 && !isDelete {
|
|
|
// 原始数据删除15天前的
|
|
|
e.DB.Model(new(model.NeckRingOriginal)).
|
|
@@ -68,6 +54,28 @@ func (e *Entry) NeckRingOriginalMergeData() error {
|
|
|
isDelete = true
|
|
|
}
|
|
|
}()
|
|
|
+ limit := e.Cfg.NeckRingLimit
|
|
|
+ if limit <= 0 {
|
|
|
+ limit = defaultLimit
|
|
|
+ }
|
|
|
+
|
|
|
+ //createdAt := newTime.Add(-1 * time.Hour)
|
|
|
+ //neckRingNumber := []string{"10026", "10027", "10028", "10029", "10030", "10031", "10032", "10033", "10034", "10035"}
|
|
|
+ neckRingNumber := []string{"10027"}
|
|
|
+
|
|
|
+ neckRingList := make([]*model.NeckRingOriginal, 0)
|
|
|
+ if err = e.DB.Model(new(model.NeckRingOriginal)).
|
|
|
+ Where("is_show = ?", pasturePb.IsShow_No).
|
|
|
+ Where("neck_ring_number IN (?)", neckRingNumber).
|
|
|
+ //Where("created_at <= ?", createdAt.Unix()).
|
|
|
+ Order("active_date,frameid,neck_ring_number").
|
|
|
+ Limit(int(limit)).Find(&neckRingList).Error; err != nil {
|
|
|
+ return xerr.WithStack(err)
|
|
|
+ }
|
|
|
+ if len(neckRingList) <= 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
// 计算合并
|
|
|
neckActiveHabitList := e.recalculate(neckRingList)
|
|
|
if len(neckActiveHabitList) <= 0 {
|
|
@@ -92,24 +100,14 @@ func (e *Entry) NeckRingOriginalMergeData() error {
|
|
|
}
|
|
|
continue
|
|
|
}
|
|
|
-
|
|
|
- if historyNeckActiveHabit == nil || historyNeckActiveHabit.Id <= 0 {
|
|
|
- zaplog.Error("NeckRingOriginalMergeData-3",
|
|
|
- zap.Any("ct", ct),
|
|
|
- zap.Any("historyNeckActiveHabit", historyNeckActiveHabit),
|
|
|
- zap.Any("neckActiveHabit", habit),
|
|
|
- )
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
// 重新计算
|
|
|
newNeckActiveHabit := e.againRecalculate(historyNeckActiveHabit)
|
|
|
if newNeckActiveHabit != nil {
|
|
|
if err = e.DB.Model(new(model.NeckActiveHabit)).
|
|
|
- Select("rumina", "intake", "inactive", "gasp", "other", "high", "active").
|
|
|
+ Select("rumina", "intake", "inactive", "gasp", "other", "high", "active", "is_show").
|
|
|
Where("id = ?", historyNeckActiveHabit.Id).
|
|
|
Updates(newNeckActiveHabit).Error; err != nil {
|
|
|
- zaplog.Error("NeckRingOriginalMergeData-5",
|
|
|
+ zaplog.Error("NeckRingOriginalMergeData-3",
|
|
|
zap.Any("err", err),
|
|
|
zap.Any("ct", ct),
|
|
|
zap.Any("historyNeckActiveHabit", historyNeckActiveHabit),
|
|
@@ -127,11 +125,19 @@ func (e *Entry) NeckRingCalculate() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+ if calculateIsRunning {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ defer func() {
|
|
|
+ calculateIsRunning = false
|
|
|
+ }()
|
|
|
+
|
|
|
+ calculateIsRunning = true
|
|
|
for _, pasture := range pastureList {
|
|
|
if err := e.EntryUpdateActiveHabit(pasture.Id); err != nil {
|
|
|
- zaplog.Error("PastureUpdateActiveHabit", zap.Any("err", err), zap.Any("pasture", pasture))
|
|
|
+ zaplog.Error("NeckRingCalculate", zap.Any("err", err), zap.Any("pasture", pasture))
|
|
|
}
|
|
|
- zaplog.Info(fmt.Sprintf("PastureUpdateActiveHabit Success %d", pasture.Id))
|
|
|
+ zaplog.Info(fmt.Sprintf("NeckRingCalculate Success %d", pasture.Id))
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
@@ -167,7 +173,9 @@ func (e *Entry) recalculate(neckRingList []*model.NeckRingOriginal) []*model.Nec
|
|
|
}
|
|
|
v.SumAvg()
|
|
|
}
|
|
|
- return model.NeckRingOriginalMap(originalMapData).ForMatData()
|
|
|
+ dataList := model.NeckRingOriginalMap(originalMapData).ForMatData()
|
|
|
+ sort.Sort(model.NeckActiveHabitSlice(dataList))
|
|
|
+ return dataList
|
|
|
}
|
|
|
|
|
|
func (e *Entry) againRecalculate(data *model.NeckActiveHabit) *model.NeckActiveHabit {
|
|
@@ -185,7 +193,12 @@ func (e *Entry) againRecalculate(data *model.NeckActiveHabit) *model.NeckActiveH
|
|
|
if len(newDataList) != 1 {
|
|
|
return nil
|
|
|
}
|
|
|
- return newDataList[0]
|
|
|
+ res := newDataList[0]
|
|
|
+ if res.Id <= 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ res.IsShow = pasturePb.IsShow_No
|
|
|
+ return res
|
|
|
}
|
|
|
|
|
|
func (e *Entry) EntryUpdateActiveHabit(pastureId int64) (err error) {
|
|
@@ -236,19 +249,26 @@ func (e *Entry) EntryUpdateActiveHabit(pastureId int64) (err error) {
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
+ var processIds []int64
|
|
|
// 更新活动滤波
|
|
|
- if err = e.FirstFilterUpdate(pastureId, xToday); err != nil {
|
|
|
- zaplog.Error("EntryUpdateActiveHabit", zap.Any("FirstFilterUpdate", err), zap.Any("xToday", xToday))
|
|
|
+ processIds, err = e.FirstFilterUpdate(pastureId, xToday)
|
|
|
+ if err != nil {
|
|
|
+ zaplog.Error("NeckRingCalculate", zap.Any("FirstFilterUpdate", err), zap.Any("xToday", xToday))
|
|
|
}
|
|
|
|
|
|
- // 二次更新滤波
|
|
|
- if err = e.SecondUpdateChangeFilter(pastureId, xToday); err != nil {
|
|
|
- zaplog.Error("EntryUpdateActiveHabit", zap.Any("SecondUpdateChangeFilter", err), zap.Any("xToday", xToday))
|
|
|
+ if len(processIds) > 0 {
|
|
|
+ if err = e.WeeklyUpdateActiveHabit(pastureId, processIds, xToday); err != nil {
|
|
|
+ zaplog.Error("NeckRingCalculate", zap.Any("WeeklyUpdateActiveHabit", err), zap.Any("xToday", xToday))
|
|
|
+ }
|
|
|
+ // 二次更新滤波
|
|
|
+ if err = e.SecondUpdateChangeFilter(pastureId, xToday); err != nil {
|
|
|
+ zaplog.Error("NeckRingCalculate", zap.Any("SecondUpdateChangeFilter", err), zap.Any("xToday", xToday))
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// 活动量校正系数和健康评分
|
|
|
if err = e.FilterCorrectAndScoreUpdate(pastureId, xToday); err != nil {
|
|
|
- zaplog.Error("EntryUpdateActiveHabit", zap.Any("ActivityVolumeChanges", err), zap.Any("xToday", xToday))
|
|
|
+ zaplog.Error("NeckRingCalculate", zap.Any("ActivityVolumeChanges", err), zap.Any("xToday", xToday))
|
|
|
}
|
|
|
|
|
|
if err = e.DB.Model(new(model.NeckActiveHabit)).
|
|
@@ -267,12 +287,10 @@ func (e *Entry) EntryUpdateActiveHabit(pastureId int64) (err error) {
|
|
|
if err = e.DB.Model(new(model.NeckActiveHabit)).
|
|
|
Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
|
|
|
Where("pasture_id = ?", pastureId).
|
|
|
- Where("is_show = ?", pasturePb.IsShow_No).
|
|
|
Where("change_filter < ?", 0).
|
|
|
Where("filter_correct < ?", model.DefaultFilterCorrect).
|
|
|
Updates(map[string]interface{}{
|
|
|
"filter_correct": model.DefaultFilterCorrect,
|
|
|
- "is_show": pasturePb.IsShow_Ok,
|
|
|
}).Error; err != nil {
|
|
|
zaplog.Error("EntryUpdateActiveHabit", zap.Any("filter_correct", err), zap.Any("xToday", xToday))
|
|
|
}
|
|
@@ -285,17 +303,16 @@ func (e *Entry) EntryUpdateActiveHabit(pastureId int64) (err error) {
|
|
|
}
|
|
|
|
|
|
// FirstFilterUpdate 首次更新活动滤波
|
|
|
-func (e *Entry) FirstFilterUpdate(pastureId int64, xToDay *XToday) (err error) {
|
|
|
+func (e *Entry) FirstFilterUpdate(pastureId int64, xToDay *XToday) (processIds []int64, err error) {
|
|
|
newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0)
|
|
|
if err = e.DB.Model(new(model.NeckActiveHabit)).
|
|
|
Where("id BETWEEN ? AND ?", xToDay.LastMaxHabitId, xToDay.CurrMaxHabitId).
|
|
|
Where("pasture_id = ?", pastureId).
|
|
|
Where("is_show = ?", pasturePb.IsShow_No).
|
|
|
- Where("change_filter = ?", model.InitChangeFilter).
|
|
|
Where(e.DB.Where("high >= ?", xToDay.High).Or("rumina >= ?", xToDay.Rumina)).
|
|
|
Order("heat_date,neck_ring_number,frameid").
|
|
|
Limit(int(defaultLimit)).Find(&newNeckActiveHabitList).Error; err != nil {
|
|
|
- return xerr.WithStack(err)
|
|
|
+ return nil, xerr.WithStack(err)
|
|
|
}
|
|
|
// 活动量滤波
|
|
|
for _, v := range newNeckActiveHabitList {
|
|
@@ -343,6 +360,42 @@ func (e *Entry) FirstFilterUpdate(pastureId int64, xToDay *XToday) (err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ processIds = append(processIds, v.Id)
|
|
|
+
|
|
|
+ // 更新过滤值
|
|
|
+ if err = e.DB.Model(new(model.NeckActiveHabit)).
|
|
|
+ Select(
|
|
|
+ "filter_high", "filter_rumina", "filter_chew", "cow_id", "lact", "calving_age",
|
|
|
+ ).Where("id = ?", v.Id).
|
|
|
+ Updates(map[string]interface{}{
|
|
|
+ "filter_high": firstFilterData.FilterHigh,
|
|
|
+ "filter_rumina": firstFilterData.FilterRumina,
|
|
|
+ "filter_chew": firstFilterData.FilterChew,
|
|
|
+ "cow_id": cowInfo.Id,
|
|
|
+ "lact": cowInfo.Lact,
|
|
|
+ "calving_age": cowInfo.CalvingAge,
|
|
|
+ }).Error; err != nil {
|
|
|
+ zaplog.Error("FirstFilterUpdate",
|
|
|
+ zap.Any("error", err),
|
|
|
+ zap.Any("firstFilterData", firstFilterData),
|
|
|
+ zap.Any("NeckActiveHabit", v),
|
|
|
+ zap.Any("cowInfo", cowInfo),
|
|
|
+ zap.Any("xToday", xToDay),
|
|
|
+ )
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return processIds, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (e *Entry) WeeklyUpdateActiveHabit(pastureId int64, processIds []int64, xToDay *XToday) (err error) {
|
|
|
+ newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0)
|
|
|
+ if err = e.DB.Model(new(model.NeckActiveHabit)).
|
|
|
+ Where("id IN (?)", processIds).
|
|
|
+ Order("heat_date,neck_ring_number,frameid").
|
|
|
+ Find(&newNeckActiveHabitList).Error; err != nil {
|
|
|
+ return xerr.WithStack(err)
|
|
|
+ }
|
|
|
+ for _, v := range newNeckActiveHabitList {
|
|
|
// 前七天的
|
|
|
weekHabitData := e.FindWeekHabitData(pastureId, v.NeckRingNumber, v.HeatDate, v.Frameid, xToDay)
|
|
|
highDiff := v.FilterHigh - weekHabitData.WeekHighHabit
|
|
@@ -370,18 +423,11 @@ func (e *Entry) FirstFilterUpdate(pastureId int64, xToDay *XToday) (err error) {
|
|
|
// 更新过滤值
|
|
|
if err = e.DB.Model(new(model.NeckActiveHabit)).
|
|
|
Select(
|
|
|
- "filter_high", "filter_rumina", "filter_chew", "cow_id", "lact", "calving_age",
|
|
|
"week_high_habit", "week_rumina_habit", "week_chew_habit", "week_intake_habit", "week_inactive_habit",
|
|
|
"sum_rumina", "sum_intake", "sum_inactive", "sum_active", "sum_max_high", "sum_min_high", "sum_min_chew",
|
|
|
"change_high", "change_rumina", "change_chew", "before_three_sum_rumina", "before_three_sum_intake",
|
|
|
).Where("id = ?", v.Id).
|
|
|
Updates(map[string]interface{}{
|
|
|
- "filter_high": firstFilterData.FilterHigh,
|
|
|
- "filter_rumina": firstFilterData.FilterRumina,
|
|
|
- "filter_chew": firstFilterData.FilterChew,
|
|
|
- "cow_id": cowInfo.Id,
|
|
|
- "lact": cowInfo.Lact,
|
|
|
- "calving_age": cowInfo.CalvingAge,
|
|
|
"week_high_habit": weekHabitData.WeekHighHabit,
|
|
|
"week_rumina_habit": weekHabitData.WeekRuminaHabit,
|
|
|
"week_chew_habit": weekHabitData.WeekChewHabit,
|
|
@@ -400,11 +446,14 @@ func (e *Entry) FirstFilterUpdate(pastureId int64, xToDay *XToday) (err error) {
|
|
|
"before_three_sum_rumina": before3DaysNeckActiveHabit.SumRumina,
|
|
|
"before_three_sum_intake": before3DaysNeckActiveHabit.SumIntake,
|
|
|
}).Error; err != nil {
|
|
|
- return xerr.WithStack(err)
|
|
|
+ zaplog.Error("WeeklyUpdateActiveHabit",
|
|
|
+ zap.Error(err),
|
|
|
+ zap.Any("NeckActiveHabit", v),
|
|
|
+ zap.Any("pastureId", pastureId),
|
|
|
+ )
|
|
|
}
|
|
|
}
|
|
|
- zaplog.Info("EntryUpdateActiveHabit-FilterUpdate-Success")
|
|
|
- return nil
|
|
|
+ return err
|
|
|
}
|
|
|
|
|
|
// SecondUpdateChangeFilter 第二次更新变化趋势滤波
|
|
@@ -493,7 +542,6 @@ func (e *Entry) SecondUpdateChangeFilter(pastureId int64, xToday *XToday) (err e
|
|
|
zaplog.Error("SecondUpdateChangeFilter-1", zap.Any("error", err), zap.Any("xToday", xToday))
|
|
|
}
|
|
|
}
|
|
|
- zaplog.Info("SecondUpdateChangeFilter-Success")
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -557,7 +605,6 @@ func (e *Entry) FilterCorrectAndScoreUpdate(pastureId int64, xToday *XToday) err
|
|
|
continue
|
|
|
}
|
|
|
}
|
|
|
- zaplog.Info("EntryUpdateActiveHabit-ActivityVolumeChanges-Success")
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -565,11 +612,6 @@ func (e *Entry) FilterCorrectAndScoreUpdate(pastureId int64, xToday *XToday) err
|
|
|
func (e *Entry) UpdateChangeAdJust(pastureId int64, xToday *XToday) error {
|
|
|
res := make([]*model.NeckRingBarChange, 0)
|
|
|
oneDayAgo := time.Now().AddDate(0, 0, -1).Format(model.LayoutDate2)
|
|
|
- //SELECT h.neck_ring_number,h.heat_date, h.frameid, c.pen_id, c.pen_name, COUNT(*) as nb, ROUND(AVG(h.change_high)) as change_high,
|
|
|
- //ROUND(AVG(h.change_filter)) as change_filter F
|
|
|
- //ROM neck_active_habit as h JOIN cow as c ON h.neck_ring_number = c.neck_ring_number
|
|
|
- //WHERE h.pasture_id = 1 AND h.heat_date >= '2025-01-16' AND h.cow_id >= 0
|
|
|
- //GROUP BY h.heat_date, h.frameid, c.pen_id ORDER BY h.heat_date, h.frameid, c.pen_name
|
|
|
if err := e.DB.Table(fmt.Sprintf("%s as h", new(model.NeckActiveHabit).TableName())).
|
|
|
Select("h.neck_ring_number,h.heat_date, h.frameid, c.pen_id, c.pen_name, COUNT(*) as nb, ROUND(AVG(h.change_high)) as change_high, ROUND(AVG(h.change_filter)) as change_filter").
|
|
|
Joins("JOIN cow as c ON h.neck_ring_number = c.neck_ring_number").
|
|
@@ -592,6 +634,15 @@ func (e *Entry) UpdateChangeAdJust(pastureId int64, xToday *XToday) error {
|
|
|
zaplog.Error("UpdateChangeAdJust-1", zap.Any("error", err), zap.Any("xToday", xToday))
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // 更新所有的显示状态为否的记录为是
|
|
|
+ if err := e.DB.Model(new(model.NeckActiveHabit)).
|
|
|
+ Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
|
|
|
+ Where("pasture_id = ?", pastureId).
|
|
|
+ Where("is_show = ?", pasturePb.IsShow_No).
|
|
|
+ Update("is_show = ?", pasturePb.IsShow_Ok).Error; err != nil {
|
|
|
+ zaplog.Error("UpdateChangeAdJust-2", zap.Any("error", err), zap.Any("xToday", xToday))
|
|
|
+ }
|
|
|
return nil
|
|
|
}
|
|
|
|