milk_daily.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package crontab
  2. import (
  3. "kpt-pasture/model"
  4. "kpt-pasture/util"
  5. "time"
  6. "gitee.com/xuyiping_admin/pkg/xerr"
  7. pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
  8. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  9. "go.uber.org/zap"
  10. )
  11. func (e *Entry) InsertMilkDaily() error {
  12. pastureList := e.FindPastureList()
  13. if pastureList == nil || len(pastureList) == 0 {
  14. return nil
  15. }
  16. for _, pasture := range pastureList {
  17. // 获取最大记录日期或默认日期
  18. pastureId := pasture.Id
  19. var maxDateTime string
  20. if err := e.DB.Model(new(model.MilkDaily)).
  21. Where("pasture_id = ?", pastureId).
  22. Select("IFNULL(MAX(heat_date) + INTERVAL 1 DAY, CURDATE() - INTERVAL 1 MONTH) AS max_date_time").
  23. Scan(&maxDateTime).Error; err != nil {
  24. zaplog.Error("InsertMilkDaily", zap.Any("pastureId", pastureId), zap.Any("err", err))
  25. continue
  26. }
  27. maxDate, err := util.TimeParseLocal(model.LayoutDate2, maxDateTime)
  28. if err != nil {
  29. zaplog.Error("InsertMilkDaily", zap.Any("pastureId", pastureId), zap.Any("err", err))
  30. continue
  31. }
  32. e.ProcessMilkDaily(pastureId, maxDate)
  33. }
  34. return nil
  35. }
  36. func (e *Entry) ProcessMilkDaily(pastureId int64, maxDate time.Time) {
  37. nowTime := time.Now().Local()
  38. // 处理每一天的数据
  39. for maxDate.Before(nowTime) {
  40. // 处理有胎次的奶牛
  41. processCowIds := make([]int64, 0)
  42. cowIds, err := e.processCowsWithLact(pastureId, maxDate)
  43. if err != nil {
  44. zaplog.Error("ProcessMilkDaily", zap.Any("processCowsWithFetal", err))
  45. }
  46. if len(cowIds) > 0 {
  47. processCowIds = append(processCowIds, cowIds...)
  48. }
  49. // 处理无胎次的奶牛
  50. cowIds, err = e.processCowsWithNoLact(pastureId, maxDate)
  51. if err != nil {
  52. zaplog.Error("ProcessMilkDaily", zap.Any("processCowsWithoutFetal", err))
  53. }
  54. if len(cowIds) > 0 {
  55. processCowIds = append(processCowIds, cowIds...)
  56. }
  57. if len(processCowIds) > 0 {
  58. e.UpdateMilkDaily(pastureId, processCowIds, maxDate.Format(model.LayoutDate2))
  59. }
  60. // 日期加1天
  61. maxDate = maxDate.AddDate(0, 0, 1)
  62. }
  63. }
  64. // UpdateMilkDaily
  65. // SELECT h.intCowId, ROUND(AVG(h.filterhigh), 0) high, ROUND(AVG( h.rumina)*12, 0) rumina, ROUND(AVG( h.intake)*12, 0) intake,
  66. // // ROUND(AVG( h.inactive)*12, 0) inactive, ROUND(AVG( h.act)*12, 0) act, COUNT(1) nb
  67. // // FROM h_activehabit h WHERE h.intPastureId=PastuId AND h.heatdate=xDate GROUP BY h.intCowId HAVING nb>=8
  68. func (e *Entry) UpdateMilkDaily(pastureId int64, processCowIds []int64, heatDate string) error {
  69. neckActiveHabitList := make([]*model.NeckActiveHabit, 0)
  70. if err := e.DB.Model(new(model.NeckActiveHabit)).
  71. Select(`cow_id, ROUND(AVG(filter_high), 0) AS high, ROUND(AVG(rumina)*12, 0) AS rumina, ROUND(AVG(intake)*12, 0) AS intake,
  72. ROUND(AVG(inactive)*12, 0) AS inactive, ROUND(AVG(active)*12, 0) AS active, COUNT(1) AS record_count`).
  73. Where("pasture_id = ?", pastureId).
  74. Where("cow_id IN ?", processCowIds).
  75. Where("heat_date = ?", heatDate).
  76. Group("cow_id").
  77. Find(&neckActiveHabitList).Error; err != nil {
  78. return xerr.WithStack(err)
  79. }
  80. for _, v := range neckActiveHabitList {
  81. // todo 先不过滤
  82. /*if v.RecordCount < 8 {
  83. continue
  84. }*/
  85. if err := e.DB.Model(new(model.MilkDaily)).
  86. Where("pasture_id = ?", pastureId).
  87. Where("cow_id = ?", v.CowId).
  88. Where("heat_date = ?", heatDate).
  89. Updates(map[string]interface{}{
  90. "day_high": v.High,
  91. "day_rumina": v.Rumina,
  92. "day_intake": v.Intake,
  93. "day_inactive": v.Inactive,
  94. "day_active": v.Active,
  95. }).Error; err != nil {
  96. zaplog.Error("UpdateMilkDaily", zap.Any("pastureId", pastureId), zap.Any("cowId", v.CowId), zap.Any("heatDate", heatDate), zap.Any("err", err))
  97. continue
  98. }
  99. }
  100. return nil
  101. }
  102. // 处理有胎次的奶牛
  103. func (e *Entry) processCowsWithLact(pastureId int64, recordDate time.Time) ([]int64, error) {
  104. // 查询有胎次且在记录日期前有记录的奶牛
  105. cowList := make([]*model.Cow, 0)
  106. if err := e.DB.Model(new(model.Cow)).
  107. Where("pasture_id = ?", pastureId).
  108. Where("admission_status = ?", pasturePb.AdmissionStatus_Admission).
  109. Where("lact > ?", 0).
  110. Where("last_calving_at <= ?", recordDate.Local().Unix()).
  111. Find(&cowList).Error; err != nil {
  112. return nil, err
  113. }
  114. // 批量插入数据
  115. milkDailyList := make([]*model.MilkDaily, 0)
  116. cowIds := make([]int64, 0)
  117. for _, cow := range cowList {
  118. calvingDate := ""
  119. if cow.LastCalvingAt > 0 {
  120. calvingDate = time.Unix(cow.LastCalvingAt, 0).Local().Format(model.LayoutDate2)
  121. }
  122. milkDaily := &model.MilkDaily{
  123. CowId: cow.Id,
  124. Lact: cow.Lact,
  125. PastureId: cow.PastureId,
  126. EarNumber: cow.EarNumber,
  127. CalvingDate: calvingDate,
  128. HeatDate: recordDate.Format(model.LayoutDate2),
  129. LactationAge: cow.LactationAge,
  130. PenId: cow.PenId,
  131. PenName: cow.PenName,
  132. BreedStatus: cow.BreedStatus,
  133. }
  134. milkDailyList = append(milkDailyList, milkDaily)
  135. cowIds = append(cowIds, cow.Id)
  136. }
  137. if len(milkDailyList) > 0 {
  138. // 分批次插入数据
  139. for i := 0; i < len(milkDailyList); i += 100 {
  140. end := i + 100
  141. if end > len(milkDailyList) {
  142. end = len(milkDailyList)
  143. }
  144. if err := e.DB.Model(new(model.MilkDaily)).
  145. Create(milkDailyList[i:end]).Error; err != nil {
  146. return nil, err
  147. }
  148. }
  149. }
  150. return cowIds, nil
  151. }
  152. // 处理无胎次的奶牛
  153. func (e *Entry) processCowsWithNoLact(pastureId int64, recordDate time.Time) ([]int64, error) {
  154. // 查询无胎次且EID1>0的奶牛
  155. cowList := make([]*model.Cow, 0)
  156. err := e.DB.Model(new(model.Cow)).
  157. //Select("c.intCowId as cow_id, c.intPastureId as pasture_id, c.varCowCode as cow_code,c.dateBirthDate as birth_date, c.intCurBar as cur_bar").
  158. //Where("(c.dateLeave IS NULL OR c.dateLeave > ?) AND c.intCurFetal = 0 AND c.EID1 > 0", recordDate).
  159. Where("pasture_id = ?", pastureId).
  160. Where("admission_status = ?", pasturePb.AdmissionStatus_Admission).
  161. Where("lact = ?", 0).
  162. Where("neck_ring_number != ?", "").
  163. Find(&cowList).Error
  164. if err != nil {
  165. return nil, err
  166. }
  167. // 批量插入数据
  168. milkDailyList := make([]*model.MilkDaily, 0)
  169. cowIds := make([]int64, 0)
  170. for _, cow := range cowList {
  171. birthDate, calvingDate := "", ""
  172. if cow.BirthAt > 0 {
  173. birthDate = time.Unix(cow.BirthAt, 0).Local().Format(model.LayoutDate2)
  174. }
  175. if cow.LastCalvingAt > 0 {
  176. calvingDate = time.Unix(cow.LastCalvingAt, 0).Local().Format(model.LayoutDate2)
  177. }
  178. milkDaily := &model.MilkDaily{
  179. CowId: cow.Id,
  180. Lact: cow.Lact,
  181. PastureId: cow.PastureId,
  182. EarNumber: cow.EarNumber,
  183. BirthDate: birthDate,
  184. CalvingDate: calvingDate,
  185. LactationAge: cow.LactationAge,
  186. HeatDate: recordDate.Format(model.LayoutDate2),
  187. PenId: cow.PenId,
  188. PenName: cow.PenName,
  189. BreedStatus: cow.BreedStatus,
  190. }
  191. milkDailyList = append(milkDailyList, milkDaily)
  192. cowIds = append(cowIds, cow.Id)
  193. }
  194. if len(milkDailyList) > 0 {
  195. if err = e.DB.Model(new(model.MilkDaily)).
  196. Create(&milkDailyList).Error; err != nil {
  197. return nil, err
  198. }
  199. }
  200. return cowIds, nil
  201. }