neck_ring_merge.go 8.5 KB


  1. package crontab
  2. import (
  3. "fmt"
  4. "kpt-pasture/model"
  5. "kpt-pasture/util"
  6. "math"
  7. "sort"
  8. "strings"
  9. "time"
  10. pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
  11. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  12. "gitee.com/xuyiping_admin/pkg/xerr"
  13. "go.uber.org/zap"
  14. )
  15. const (
  16. MinChangeFilter = -99
  17. MinRuminaFilter = -99
  18. MinChewFilter = -99
  19. MinChangeHigh = -99
  20. DefaultNb = 30
  21. DefaultScore = 100
  22. )
  23. var (
  24. defaultLimit = int32(1000)
  25. )
  26. // NeckRingOriginalMerge 把脖环数据合并成2个小时的
  27. func (e *Entry) NeckRingOriginalMerge() (err error) {
  28. if ok := e.IsExistCrontabLog(NeckRingOriginal); !ok {
  29. e.CreateCrontabLog(NeckRingOriginal)
  30. }
  31. pastureList := e.FindPastureList()
  32. if pastureList == nil || len(pastureList) == 0 {
  33. return nil
  34. }
  35. for _, pasture := range pastureList {
  36. if err = e.OriginalMergeData(pasture.Id); err != nil {
  37. zaplog.Error("NeckRingOriginalMerge", zap.Any("OriginalMergeData", err), zap.Any("pasture", pasture))
  38. }
  39. }
  40. return nil
  41. }
  42. func (e *Entry) OriginalMergeData(pastureId int64) error {
  43. limit := e.Cfg.NeckRingLimit
  44. if limit <= 0 {
  45. limit = defaultLimit
  46. }
  47. neckRingList := make([]*model.NeckRingOriginal, 0)
  48. if err := e.DB.Model(new(model.NeckRingOriginal)).
  49. Where("is_show = ?", pasturePb.IsShow_No).
  50. Where("pasture_id = ?", pastureId).
  51. Limit(int(limit)).
  52. Find(&neckRingList).Error; err != nil {
  53. return xerr.WithStack(err)
  54. }
  55. if len(neckRingList) <= 0 {
  56. return nil
  57. }
  58. // 去重
  59. neckRingList = RemoveDuplicates(neckRingList)
  60. // 计算合并
  61. neckActiveHabitList := Recalculate(neckRingList)
  62. if len(neckActiveHabitList) <= 0 {
  63. return nil
  64. }
  65. for _, habit := range neckActiveHabitList {
  66. //更新脖环牛只相关信息 新数据直接插入
  67. historyNeckActiveHabit, ct := e.IsExistNeckActiveHabit(pastureId, habit.NeckRingNumber, habit.HeatDate, habit.Frameid)
  68. if ct <= 0 {
  69. if err := e.DB.Create(habit).Error; err != nil {
  70. zaplog.Info("NeckRingOriginalMergeData-1",
  71. zap.Any("err", err),
  72. zap.Any("neckActiveHabit", habit),
  73. )
  74. }
  75. } else {
  76. // 重新计算
  77. newNeckActiveHabit := e.againRecalculate(historyNeckActiveHabit)
  78. if newNeckActiveHabit == nil {
  79. continue
  80. }
  81. if err := e.DB.Model(new(model.NeckActiveHabit)).
  82. Select("rumina", "intake", "inactive", "gasp", "other", "high", "active", "is_show", "record_count").
  83. Where("id = ?", historyNeckActiveHabit.Id).
  84. Updates(newNeckActiveHabit).Error; err != nil {
  85. zaplog.Error("NeckRingOriginalMergeData-2",
  86. zap.Any("err", err),
  87. zap.Any("ct", ct),
  88. zap.Any("historyNeckActiveHabit", historyNeckActiveHabit),
  89. zap.Any("newNeckActiveHabit", newNeckActiveHabit),
  90. )
  91. }
  92. }
  93. if err := e.UpdateNeckRingOriginalIsShow(habit); err != nil {
  94. zaplog.Error("NeckRingOriginalMergeData-4",
  95. zap.Any("err", err),
  96. zap.Any("neckActiveHabit", habit),
  97. )
  98. }
  99. }
  100. return nil
  101. }
  102. func (e *Entry) UpdateNeckRingOriginalIsShow(habit *model.NeckActiveHabit) error {
  103. if err := e.DB.Model(new(model.NeckRingOriginal)).
  104. Where("pasture_id = ?", habit.PastureId).
  105. Where("neck_ring_number = ?", habit.NeckRingNumber).
  106. Where("active_date = ?", habit.HeatDate).
  107. Where("frameid IN (?)", util.FrameIds(habit.Frameid)).
  108. Update("is_show", pasturePb.IsShow_Ok).Error; err != nil {
  109. return xerr.WithStack(err)
  110. }
  111. return nil
  112. }
  113. // RemoveDuplicates 清洗一下数据,去掉重复的,如果有重复的,取最新的一条数据
  114. func RemoveDuplicates(records []*model.NeckRingOriginal) []*model.NeckRingOriginal {
  115. uniqueRecords := make(map[string]*model.NeckRingOriginal)
  116. // 遍历原始数组
  117. for _, record := range records {
  118. mapKey := fmt.Sprintf("%s%s%s%s%d", record.NeckRingNumber, model.JoinKey, record.ActiveDate, model.JoinKey, record.Frameid) // 0001/2023-12-04/0 0001/2023-12-03/4
  119. if existing, exists := uniqueRecords[mapKey]; exists {
  120. if record.CreatedAt > existing.CreatedAt {
  121. uniqueRecords[mapKey] = record
  122. }
  123. } else {
  124. uniqueRecords[mapKey] = record
  125. }
  126. }
  127. // 将 map 中的记录转换为切片
  128. result := make([]*model.NeckRingOriginal, 0, len(uniqueRecords))
  129. for _, record := range uniqueRecords {
  130. result = append(result, record)
  131. }
  132. return result
  133. }
  134. // Recalculate 合并计算
  135. func Recalculate(neckRingList []*model.NeckRingOriginal) []*model.NeckActiveHabit {
  136. originalMapData := make(map[string]*model.NeckRingOriginalMerge)
  137. // 合并成2个小时的
  138. for _, v := range neckRingList {
  139. xframeId := util.XFrameId(v.Frameid)
  140. mapKey := fmt.Sprintf("%s%s%s%s%d", v.NeckRingNumber, model.JoinKey, v.ActiveDate, model.JoinKey, xframeId) // 0001/2023-12-04/0 0001/2023-12-03/4
  141. if originalMapData[mapKey] == nil {
  142. originalMapData[mapKey] = new(model.NeckRingOriginalMerge)
  143. }
  144. originalMapData[mapKey].IsMageData(v, xframeId)
  145. }
  146. currTime := time.Now().Local()
  147. res := make([]*model.NeckActiveHabit, 0)
  148. // 算平均值
  149. for k, v := range originalMapData {
  150. // 过滤掉合并后<6条数据,如果时间太短就晚点再算
  151. if v.RecordCount < model.DefaultRecordCount {
  152. currMaxXframeId := util.FrameIdMapReverse[int32(currTime.Hour())]
  153. activeDateString := fmt.Sprintf("%s %02d:00:00", v.ActiveDate, v.XframeId*2+1)
  154. activeDate, _ := util.TimeParseLocal(model.LayoutTime, activeDateString)
  155. if currMaxXframeId-v.XframeId <= 1 && currTime.Add(-1*time.Hour).Unix() < activeDate.Unix() {
  156. delete(originalMapData, k)
  157. continue
  158. }
  159. }
  160. v.SumAvg()
  161. }
  162. if len(originalMapData) <= 0 {
  163. return res
  164. }
  165. res = model.NeckRingOriginalMap(originalMapData).ForMatData()
  166. sort.Sort(model.NeckActiveHabitSlice(res))
  167. return res
  168. }
  169. func (e *Entry) againRecalculate(data *model.NeckActiveHabit) *model.NeckActiveHabit {
  170. originalList := make([]*model.NeckRingOriginal, 0)
  171. frameIds := util.FrameIds(data.Frameid)
  172. sql := ""
  173. for _, frameId := range frameIds {
  174. sql += fmt.Sprintf(`SELECT * FROM neck_ring_original WHERE pasture_id = %d AND neck_ring_number = '%s' AND active_date = '%s' AND frameid = %d UNION ALL `, data.PastureId, data.NeckRingNumber, data.HeatDate, frameId)
  175. }
  176. if len(sql) > 0 {
  177. sql = strings.TrimSuffix(sql, "UNION ALL ")
  178. }
  179. if err := e.DB.Raw(sql).Find(&originalList).Error; err != nil {
  180. return nil
  181. }
  182. originalList = RemoveDuplicates(originalList)
  183. newDataList := Recalculate(originalList)
  184. if len(newDataList) != 1 {
  185. return nil
  186. }
  187. res := newDataList[0]
  188. res.IsShow = pasturePb.IsShow_No
  189. return res
  190. }
  191. // computeIfPositiveElse 辅助函数来计算过滤值
  192. func computeIfPositiveElse(newValue, prevFilterValue float64, weightPrev, weightNew float64) float64 {
  193. return math.Ceil((prevFilterValue * weightPrev) + (weightNew * newValue))
  194. }
  195. // 计算 score 的逻辑
  196. func calculateScore(habit *model.NeckActiveHabit) int {
  197. // 第一部分逻辑
  198. var part1 float64
  199. switch {
  200. case (habit.CalvingAge <= 1 && habit.Lact >= 1) ||
  201. (habit.CalvingAge >= 2 && habit.CalvingAge <= 13 && (habit.SumRumina+habit.SumIntake) == 0) ||
  202. ((habit.Lact == 0 || habit.CalvingAge >= 14) && habit.ChangeFilter == -99):
  203. part1 = -199
  204. case habit.CalvingAge >= 2 && habit.CalvingAge <= 13:
  205. part1 = math.Min((float64(habit.SumRumina+habit.SumIntake)-(100+math.Min(7, float64(habit.CalvingAge))*60))/10*2, 0)
  206. case habit.ChangeFilter > -99:
  207. part1 = math.Min(0, math.Min(getValueOrDefault(float64(habit.ChangeFilter), 0), getValueOrDefault(float64(habit.SumMinHigh), 0)))*0.2 +
  208. math.Min(0, math.Min(getValueOrDefault(float64(habit.ChangeFilter), 0), getValueOrDefault(float64(habit.SumMinChew), 0)))*0.2 +
  209. getRuminaSumIntakeSumScore(float64(habit.SumRumina+habit.SumIntake)) + getAdditionalScore(habit)
  210. default:
  211. part1 = -299
  212. }
  213. // 第二部分逻辑
  214. var part2 float64
  215. versionMod := habit.FirmwareVersion % 100
  216. if versionMod >= 52 {
  217. part2 = 1
  218. } else if versionMod >= 30 && versionMod <= 43 {
  219. part2 = 0.8
  220. } else {
  221. part2 = 0.6
  222. }
  223. // 最终 score
  224. return DefaultScore + int(math.Floor(part1*part2))
  225. }
  226. // 获取值或默认值
  227. func getValueOrDefault(value, defaultValue float64) float64 {
  228. if value > -99 {
  229. return value
  230. }
  231. return defaultValue
  232. }
  233. // 计算累计反刍得分
  234. func getRuminaSumIntakeSumScore(sum float64) float64 {
  235. switch {
  236. case sum < 80:
  237. return -30
  238. case sum < 180:
  239. return -20
  240. case sum < 280:
  241. return -10
  242. default:
  243. return 0
  244. }
  245. }
  246. // 计算额外得分
  247. func getAdditionalScore(habit *model.NeckActiveHabit) float64 {
  248. var score float64
  249. if (habit.SumRumina+habit.SumIntake < 280 || habit.SumMinHigh+habit.SumMinChew < -50) && habit.SumMaxHigh > 50 {
  250. score += 10
  251. }
  252. if habit.ChangeFilter < -30 && habit.ChangeFilter <= habit.SumMinHigh && habit.ChewFilter < -30 && habit.ChewFilter <= habit.SumMinChew {
  253. score -= 5
  254. }
  255. return score
  256. }