neck_ring_merge.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. package crontab
  2. import (
  3. "fmt"
  4. "kpt-pasture/model"
  5. "kpt-pasture/util"
  6. "math"
  7. "sort"
  8. "strings"
  9. "time"
  10. pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
  11. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  12. "gitee.com/xuyiping_admin/pkg/xerr"
  13. "go.uber.org/zap"
  14. )
  15. const (
  16. MinChangeFilter = -99
  17. MinRuminaFilter = -99
  18. MinChewFilter = -99
  19. MinChangeHigh = -99
  20. DefaultNb = 30
  21. DefaultScore = 100
  22. )
  23. var (
  24. defaultLimit = int32(1000)
  25. calculateIsRunning bool
  26. )
  27. // NeckRingOriginalMerge 把脖环数据合并成2个小时的
  28. func (e *Entry) NeckRingOriginalMerge() (err error) {
  29. if ok := e.IsExistCrontabLog(NeckRingOriginal); !ok {
  30. newTime := time.Now()
  31. e.CreateCrontabLog(NeckRingOriginal)
  32. // 原始数据删除15天前的
  33. e.DB.Model(new(model.NeckRingOriginal)).
  34. Where("created_at < ?", newTime.AddDate(0, 0, -7).Unix()).
  35. Delete(new(model.NeckRingOriginal))
  36. // 活动数据删除6个月前的数据
  37. e.DB.Model(new(model.NeckActiveHabit)).
  38. Where("created_at < ?", newTime.AddDate(0, -2, 0).Unix()).
  39. Delete(new(model.NeckActiveHabit))
  40. }
  41. pastureList := e.FindPastureList()
  42. if pastureList == nil || len(pastureList) == 0 {
  43. return nil
  44. }
  45. for _, pasture := range pastureList {
  46. if err = e.OriginalMergeData(pasture.Id); err != nil {
  47. zaplog.Error("NeckRingOriginalMerge", zap.Any("OriginalMergeData", err), zap.Any("pasture", pasture))
  48. }
  49. }
  50. return nil
  51. }
  52. func (e *Entry) OriginalMergeData(pastureId int64) error {
  53. limit := e.Cfg.NeckRingLimit
  54. if limit <= 0 {
  55. limit = defaultLimit
  56. }
  57. neckRingList := make([]*model.NeckRingOriginal, 0)
  58. if err := e.DB.Model(new(model.NeckRingOriginal)).
  59. Where("is_show = ?", pasturePb.IsShow_No).
  60. Where("pasture_id = ?", pastureId).
  61. Limit(int(limit)).
  62. Find(&neckRingList).Error; err != nil {
  63. return xerr.WithStack(err)
  64. }
  65. if len(neckRingList) <= 0 {
  66. return nil
  67. }
  68. // 去重
  69. neckRingList = RemoveDuplicates(neckRingList)
  70. // 计算合并
  71. neckActiveHabitList := Recalculate(neckRingList)
  72. if len(neckActiveHabitList) <= 0 {
  73. return nil
  74. }
  75. for _, habit := range neckActiveHabitList {
  76. //更新脖环牛只相关信息 新数据直接插入
  77. historyNeckActiveHabit, ct := e.IsExistNeckActiveHabit(pastureId, habit.NeckRingNumber, habit.HeatDate, habit.Frameid)
  78. if ct <= 0 {
  79. if err := e.DB.Create(habit).Error; err != nil {
  80. zaplog.Info("NeckRingOriginalMergeData-1",
  81. zap.Any("err", err),
  82. zap.Any("neckActiveHabit", habit),
  83. )
  84. }
  85. } else {
  86. // 重新计算
  87. newNeckActiveHabit := e.againRecalculate(historyNeckActiveHabit)
  88. if newNeckActiveHabit == nil {
  89. continue
  90. }
  91. if err := e.DB.Model(new(model.NeckActiveHabit)).
  92. Select("rumina", "intake", "inactive", "gasp", "other", "high", "active", "is_show", "record_count").
  93. Where("id = ?", historyNeckActiveHabit.Id).
  94. Updates(newNeckActiveHabit).Error; err != nil {
  95. zaplog.Error("NeckRingOriginalMergeData-2",
  96. zap.Any("err", err),
  97. zap.Any("ct", ct),
  98. zap.Any("historyNeckActiveHabit", historyNeckActiveHabit),
  99. zap.Any("newNeckActiveHabit", newNeckActiveHabit),
  100. )
  101. }
  102. }
  103. if err := e.UpdateNeckRingOriginalIsShow(habit); err != nil {
  104. zaplog.Error("NeckRingOriginalMergeData-4",
  105. zap.Any("err", err),
  106. zap.Any("neckActiveHabit", habit),
  107. )
  108. }
  109. }
  110. return nil
  111. }
  112. func (e *Entry) UpdateNeckRingOriginalIsShow(habit *model.NeckActiveHabit) error {
  113. if err := e.DB.Model(new(model.NeckRingOriginal)).
  114. Where("pasture_id = ?", habit.PastureId).
  115. Where("neck_ring_number = ?", habit.NeckRingNumber).
  116. Where("active_date = ?", habit.HeatDate).
  117. Where("frameid IN (?)", util.FrameIds(habit.Frameid)).
  118. Update("is_show", pasturePb.IsShow_Ok).Error; err != nil {
  119. return xerr.WithStack(err)
  120. }
  121. return nil
  122. }
  123. // RemoveDuplicates 清洗一下数据,去掉重复的,如果有重复的,取最新的一条数据
  124. func RemoveDuplicates(records []*model.NeckRingOriginal) []*model.NeckRingOriginal {
  125. uniqueRecords := make(map[string]*model.NeckRingOriginal)
  126. // 遍历原始数组
  127. for _, record := range records {
  128. mapKey := fmt.Sprintf("%s%s%s%s%d", record.NeckRingNumber, model.JoinKey, record.ActiveDate, model.JoinKey, record.Frameid) // 0001/2023-12-04/0 0001/2023-12-03/4
  129. if existing, exists := uniqueRecords[mapKey]; exists {
  130. if record.CreatedAt > existing.CreatedAt {
  131. uniqueRecords[mapKey] = record
  132. }
  133. } else {
  134. uniqueRecords[mapKey] = record
  135. }
  136. }
  137. // 将 map 中的记录转换为切片
  138. result := make([]*model.NeckRingOriginal, 0, len(uniqueRecords))
  139. for _, record := range uniqueRecords {
  140. result = append(result, record)
  141. }
  142. return result
  143. }
  144. // Recalculate 合并计算
  145. func Recalculate(neckRingList []*model.NeckRingOriginal) []*model.NeckActiveHabit {
  146. originalMapData := make(map[string]*model.NeckRingOriginalMerge)
  147. // 合并成2个小时的
  148. for _, v := range neckRingList {
  149. xframeId := util.XFrameId(v.Frameid)
  150. mapKey := fmt.Sprintf("%s%s%s%s%d", v.NeckRingNumber, model.JoinKey, v.ActiveDate, model.JoinKey, xframeId) // 0001/2023-12-04/0 0001/2023-12-03/4
  151. if originalMapData[mapKey] == nil {
  152. originalMapData[mapKey] = new(model.NeckRingOriginalMerge)
  153. }
  154. originalMapData[mapKey].IsMageData(v, xframeId)
  155. }
  156. currTime := time.Now()
  157. res := make([]*model.NeckActiveHabit, 0)
  158. // 算平均值
  159. for k, v := range originalMapData {
  160. // 过滤掉合并后<6条数据,如果时间太短就晚点再算
  161. if v.RecordCount < model.DefaultRecordCount {
  162. currMaxXframeId := util.FrameIdMapReverse[int32(currTime.Hour())]
  163. activeDateString := fmt.Sprintf("%s %02d:00:00", v.ActiveDate, v.XframeId*2+1)
  164. activeDate, _ := time.Parse(model.LayoutTime, activeDateString)
  165. if currMaxXframeId-v.XframeId <= 1 && currTime.Add(-1*time.Hour).Unix() < activeDate.Unix() {
  166. delete(originalMapData, k)
  167. continue
  168. }
  169. }
  170. v.SumAvg()
  171. }
  172. if len(originalMapData) <= 0 {
  173. return res
  174. }
  175. res = model.NeckRingOriginalMap(originalMapData).ForMatData()
  176. sort.Sort(model.NeckActiveHabitSlice(res))
  177. return res
  178. }
  179. func (e *Entry) againRecalculate(data *model.NeckActiveHabit) *model.NeckActiveHabit {
  180. originalList := make([]*model.NeckRingOriginal, 0)
  181. frameIds := util.FrameIds(data.Frameid)
  182. sql := ""
  183. for _, frameId := range frameIds {
  184. sql += fmt.Sprintf(`SELECT * FROM neck_ring_original WHERE pasture_id = %d AND neck_ring_number = '%s' AND active_date = '%s' AND frameid = %d UNION ALL `, data.PastureId, data.NeckRingNumber, data.HeatDate, frameId)
  185. }
  186. if len(sql) > 0 {
  187. sql = strings.TrimSuffix(sql, "UNION ALL ")
  188. }
  189. if err := e.DB.Raw(sql).Find(&originalList).Error; err != nil {
  190. return nil
  191. }
  192. /*if err := e.DB.Model(new(model.NeckRingOriginal)).
  193. Where("pasture_id = ?", data.PastureId).
  194. Where("neck_ring_number = ?", data.NeckRingNumber).
  195. Where("active_date = ?", data.HeatDate).
  196. Where("frameid IN (?)", frameIds).
  197. Find(&originalList).Error; err != nil {
  198. return nil
  199. }*/
  200. originalList = RemoveDuplicates(originalList)
  201. newDataList := Recalculate(originalList)
  202. if len(newDataList) != 1 {
  203. return nil
  204. }
  205. res := newDataList[0]
  206. res.IsShow = pasturePb.IsShow_No
  207. return res
  208. }
  209. // computeIfPositiveElse 辅助函数来计算过滤值
  210. func computeIfPositiveElse(newValue, prevFilterValue float64, weightPrev, weightNew float64) float64 {
  211. return math.Ceil((prevFilterValue * weightPrev) + (weightNew * newValue))
  212. }
  213. // 计算 score 的逻辑
  214. func calculateScore(habit *model.NeckActiveHabit) int {
  215. // 第一部分逻辑
  216. var part1 float64
  217. switch {
  218. case (habit.CalvingAge <= 1 && habit.Lact >= 1) ||
  219. (habit.CalvingAge >= 2 && habit.CalvingAge <= 13 && (habit.SumRumina+habit.SumIntake) == 0) ||
  220. ((habit.Lact == 0 || habit.CalvingAge >= 14) && habit.ChangeFilter == -99):
  221. part1 = -199
  222. case habit.CalvingAge >= 2 && habit.CalvingAge <= 13:
  223. part1 = math.Min((float64(habit.SumRumina+habit.SumIntake)-(100+math.Min(7, float64(habit.CalvingAge))*60))/10*2, 0)
  224. case habit.ChangeFilter > -99:
  225. part1 = math.Min(0, math.Min(getValueOrDefault(float64(habit.ChangeFilter), 0), getValueOrDefault(float64(habit.SumMinHigh), 0)))*0.2 +
  226. math.Min(0, math.Min(getValueOrDefault(float64(habit.ChangeFilter), 0), getValueOrDefault(float64(habit.SumMinChew), 0)))*0.2 +
  227. getRuminaSumIntakeSumScore(float64(habit.SumRumina+habit.SumIntake)) + getAdditionalScore(habit)
  228. default:
  229. part1 = -299
  230. }
  231. // 第二部分逻辑
  232. var part2 float64
  233. switch {
  234. case habit.FirmwareVersion%100 >= 52:
  235. part2 = 1
  236. case habit.FirmwareVersion%100 >= 30 && habit.FirmwareVersion%100 <= 43:
  237. part2 = 0.8
  238. default:
  239. part2 = 0.6
  240. }
  241. // 最终 score
  242. return DefaultScore + int(math.Floor(part1*part2))
  243. }
  244. // 获取值或默认值
  245. func getValueOrDefault(value, defaultValue float64) float64 {
  246. if value > -99 {
  247. return value
  248. }
  249. return defaultValue
  250. }
  251. // 计算累计反刍得分
  252. func getRuminaSumIntakeSumScore(sum float64) float64 {
  253. switch {
  254. case sum < 80:
  255. return -30
  256. case sum < 180:
  257. return -20
  258. case sum < 280:
  259. return -10
  260. default:
  261. return 0
  262. }
  263. }
  264. // 计算额外得分
  265. func getAdditionalScore(habit *model.NeckActiveHabit) float64 {
  266. var score float64
  267. if (habit.SumRumina+habit.SumIntake < 280 || habit.SumMinHigh+habit.SumMinChew < -50) && habit.SumMaxHigh > 50 {
  268. score += 10
  269. }
  270. if habit.ChangeFilter < -30 && habit.ChangeFilter <= habit.SumMinHigh && habit.ChewFilter < -30 && habit.ChewFilter <= habit.SumMinChew {
  271. score -= 5
  272. }
  273. return score
  274. }