milk_original_update_gea.go 16 KB


  1. package crontab
  2. import (
  3. "fmt"
  4. "kpt-pasture/model"
  5. "kpt-pasture/util"
  6. "sort"
  7. "strings"
  8. "time"
  9. "gorm.io/gorm"
  10. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  11. "go.uber.org/zap"
  12. )
  13. // UpdateRecognitionTime 识别时间超过40分钟未套杯牛只,识别改为未识别
  14. func (e *Entry) UpdateRecognitionTime(pastureId int64, hall *model.MilkHall) {
  15. milkOriginalList := make([]*model.MilkOriginal, 0)
  16. if err := e.DB.Model(new(model.MilkOriginal)).
  17. Where("pasture_id = ?", pastureId).
  18. Where("milk_hall_number = ?", hall.Name).
  19. Where("milk_hall_brand = ?", hall.Brand).
  20. Where("load = ?", 0).
  21. Find(&milkOriginalList).Error; err != nil {
  22. zaplog.Error("MilkHallData", zap.Any("err", err))
  23. }
  24. for _, v := range milkOriginalList {
  25. t1, _ := util.TimeParseLocal(model.LayoutTime, v.AttachTime)
  26. t2, _ := util.TimeParseLocal(model.LayoutTime, v.RecognitionTime)
  27. diff := t1.Sub(t2)
  28. minute := int(diff.Minutes())
  29. if util.Substr(v.RecognitionTime, -1, 8) != "00:00:00" && minute > 40 {
  30. if err := e.DB.Model(new(model.MilkOriginal)).
  31. Where("id = ?", v.Id).
  32. Updates(map[string]interface{}{
  33. "cow_id": 0,
  34. "ele_ear_number": "",
  35. "recognition_time": fmt.Sprintf("%s 00:00:00", util.Substr(v.RecognitionTime, 0, 10)),
  36. }).Error; err != nil {
  37. zaplog.Error("MilkHallData", zap.Any("err", err))
  38. }
  39. }
  40. }
  41. }
  42. // UpdateRepeatCupSet1 更新重复套杯1, 识别时间相同,且不为0为重复套杯
  43. func (e *Entry) UpdateRepeatCupSet1(milkOriginalList []*model.MilkOriginal) {
  44. if len(milkOriginalList) == 0 {
  45. return
  46. }
  47. milkOriginalMap := make(map[string][]*model.MilkOriginal)
  48. for _, v := range milkOriginalList {
  49. if strings.HasSuffix(v.RecognitionTime, "00:00:00") {
  50. continue
  51. }
  52. key := fmt.Sprintf("%s_%d_%d_%s", v.MilkDate, v.Shifts, v.DetachedAddress, v.RecognitionTime)
  53. milkOriginalMap[key] = append(milkOriginalMap[key], v)
  54. }
  55. for _, originalList := range milkOriginalMap {
  56. if len(originalList) >= 2 {
  57. // 按照Id升序排序(保留第一条)
  58. sort.Slice(originalList, func(i, j int) bool {
  59. return originalList[i].Id < originalList[j].Id
  60. })
  61. for i, v := range originalList {
  62. if i == 0 {
  63. continue
  64. }
  65. if err := e.DB.Model(new(model.MilkOriginal)).
  66. Select("").Where("id = ?", v.Id).
  67. Update("nattach", 2).Error; err != nil {
  68. zaplog.Error("UpdateRepeatCupSet1", zap.Any("err", err))
  69. }
  70. }
  71. }
  72. }
  73. }
  74. // UpdateMilkOriginCowInfo 更新牛只信息
  75. func (e *Entry) UpdateMilkOriginCowInfo(milkOriginalList []*model.MilkOriginal, hall *model.MilkHall) {
  76. milkHallMap := make(map[string][]*model.MilkOriginal)
  77. for _, v := range milkOriginalList {
  78. key := fmt.Sprintf("%s", v.MilkHallNumber)
  79. milkHallMap[key] = append(milkHallMap[key], v)
  80. }
  81. dataList, ok := milkHallMap[hall.Name]
  82. if !ok {
  83. return
  84. }
  85. switch hall.IsExtraUpdate {
  86. case model.IsExtra0:
  87. case model.IsExtra1, model.IsExtra3:
  88. for _, d := range dataList {
  89. if d.EarNumber == "" {
  90. continue
  91. }
  92. cowInfo, err := e.GetCowByEarNumber(d.PastureId, d.EarNumber)
  93. if err != nil {
  94. zaplog.Error("UpdateMilkOriginCowInfo", zap.Any("err", err), zap.Any("data", d))
  95. continue
  96. }
  97. // 更新牛只信息
  98. d.UpdateCowInfo(cowInfo)
  99. if err = e.DB.Model(new(model.MilkOriginal)).
  100. Select("cow_id", "pen_id", "pen_name").
  101. Where("id = ?", d.Id).Updates(d).Error; err != nil {
  102. zaplog.Error("UpdateMilkOriginCowInfo", zap.Any("err", err), zap.Any("data", d))
  103. }
  104. }
  105. case model.IsExtra2:
  106. default:
  107. }
  108. }
  109. func (e *Entry) UpdateMilkOriginalInitialTimesAndAttachAdjustTime(shifts []int32, milkOriginalList []*model.MilkOriginal) {
  110. for _, shift := range shifts {
  111. shiftMinDetachTimes := ""
  112. // 按脱杯地址分组处理
  113. addressMap := make(map[int64][]*model.MilkOriginal)
  114. for _, m := range milkOriginalList {
  115. if m.Shifts != shift || m.DetachedTime == "" {
  116. continue
  117. }
  118. if shiftMinDetachTimes == "" {
  119. shiftMinDetachTimes = m.DetachedTime
  120. } else {
  121. t1, _ := util.TimeParseLocal(model.LayoutTime, m.DetachedTime)
  122. t2, _ := util.TimeParseLocal(model.LayoutTime, shiftMinDetachTimes)
  123. if t2.Before(t1) {
  124. shiftMinDetachTimes = m.DetachedTime
  125. }
  126. }
  127. addressMap[m.DetachedAddress] = append(addressMap[m.DetachedAddress], m)
  128. }
  129. if shiftMinDetachTimes == "" {
  130. continue
  131. }
  132. bt, _ := util.TimeParseLocal(model.LayoutTime, shiftMinDetachTimes)
  133. b5 := bt.Add(-5*time.Minute).Format(model.LayoutHour) + "00:00"
  134. for _, list := range addressMap {
  135. // 对当前地址的记录按时间排序
  136. sort.Slice(list, func(i, j int) bool {
  137. if list[i].MilkDate != list[j].MilkDate {
  138. return list[i].MilkDate < list[j].MilkDate
  139. }
  140. if list[i].Shifts != list[j].Shifts {
  141. return list[i].Shifts < list[j].Shifts
  142. }
  143. if list[i].DetachedAddress != list[j].DetachedAddress {
  144. return list[i].DetachedAddress < list[j].DetachedAddress
  145. }
  146. return list[i].Id < list[j].Id
  147. })
  148. // 初始化变量,模拟SQL中的@address和@det
  149. var lastAddress int64 = 0
  150. var lastDetachTime string = "2001-01-01 06:00:00" // 默认初始值
  151. // 批量更新参数
  152. var updateParams []struct {
  153. ID int64
  154. InitialTimes string
  155. AttachAdjust string
  156. }
  157. for _, m := range list {
  158. var initialTimeStr string
  159. var attachAdjust string
  160. // 如果当前记录的脱杯地址与上一条不同,则使用基准时间b5
  161. if m.DetachedAddress != lastAddress {
  162. initialTimeStr = b5
  163. } else {
  164. // 否则使用上一条记录的脱杯时间
  165. initialTimeStr = lastDetachTime
  166. }
  167. // 更新最后记录的地址和时间
  168. lastAddress = m.DetachedAddress
  169. lastDetachTime = m.DetachedTime
  170. // 只有当initialTime不为空且与原有值不同时才需要更新
  171. if initialTimeStr != "" {
  172. initialTime, _ := util.TimeParseLocal(model.LayoutTime, initialTimeStr)
  173. attachTime, _ := util.TimeParseLocal(model.LayoutTime, m.AttachTime)
  174. detachTime, _ := util.TimeParseLocal(model.LayoutTime, m.DetachedTime)
  175. // 条件1:attachtimes以'00:00:00'结尾或attachtimes <= initialtimes
  176. if strings.HasSuffix(m.AttachTime, "00:00:00") || attachTime.Before(initialTime) || attachTime.Equal(initialTime) {
  177. // 计算 detachtimes - (1.5 + duration)*60 秒
  178. adjustTime1 := detachTime.Add(-time.Duration((90 + m.Duration*60)) * time.Second)
  179. // 取三者中的最大值
  180. maxTime := util.FindMaxTime(attachTime, initialTime, adjustTime1)
  181. attachAdjust = maxTime.Format(model.LayoutTime)
  182. } else {
  183. // 计算 detachtimes - duration*60 秒
  184. adjustTime2 := detachTime.Add(-time.Duration(m.Duration*60) * time.Second)
  185. // 取 attachtimes 和 adjustTime2 中的较小值
  186. minTime := util.FindMinTime(attachTime, adjustTime2)
  187. // 再与 initialtimes 取较大值
  188. maxTime := util.FindMaxTime(minTime, initialTime)
  189. attachAdjust = maxTime.Format(model.LayoutTime)
  190. }
  191. // 记录需要更新的字段
  192. updateParams = append(updateParams, struct {
  193. ID int64
  194. InitialTimes string
  195. AttachAdjust string
  196. }{
  197. ID: m.Id,
  198. InitialTimes: initialTimeStr,
  199. AttachAdjust: attachAdjust,
  200. })
  201. /*if err := e.DB.Model(new(model.MilkOriginal)).
  202. Select("initial_time").
  203. Where("id = ?", m.Id).
  204. Update("initial_time", initialTime).Error; err != nil {
  205. zaplog.Error("UpdateMilkOriginalInitialTimesAndAttachAdjustTime", zap.Any("err", err))
  206. }*/
  207. }
  208. }
  209. if len(updateParams) > 0 {
  210. // 批量更新数据库
  211. if err := e.DB.Transaction(func(tx *gorm.DB) error {
  212. for _, param := range updateParams {
  213. updates := map[string]interface{}{
  214. "initial_times": param.InitialTimes,
  215. "attach_adjust_time": param.AttachAdjust,
  216. }
  217. if err := tx.Model(new(model.MilkOriginal)).
  218. Select("initial_time", "attach_adjust_time").
  219. Where("id = ? ", param.ID).
  220. Updates(updates).Error; err != nil {
  221. return err
  222. }
  223. }
  224. return nil
  225. }); err != nil {
  226. zaplog.Error("UpdateMilkOriginalInitialTimesAndAttachAdjustTime", zap.Any("err", err))
  227. }
  228. }
  229. }
  230. }
  231. }
  232. // UpdateRepeatCupSet2 非标准重复套杯
  233. func (e *Entry) UpdateRepeatCupSet2(milkOriginalList []*model.MilkOriginal) {
  234. for _, v := range milkOriginalList {
  235. if v.AttachTime == "" || v.InitialTime == "" {
  236. continue
  237. }
  238. nattchTime, _ := util.TimeParseLocal(model.LayoutTime, v.AttachTime)
  239. initialTime, _ := util.TimeParseLocal(model.LayoutTime, v.InitialTime)
  240. if util.Substr(v.InitialTime, -1, 5) != "00:00" && v.Nattach == 0 && nattchTime.Sub(initialTime).Minutes() <= 1 {
  241. if err := e.DB.Model(new(model.MilkOriginal)).
  242. Select("nattach").
  243. Where("id = ?", v.Id).
  244. Update("nattach", 2).Error; err != nil {
  245. zaplog.Error("UpdateRepeatCupSet2", zap.Any("err", err))
  246. }
  247. }
  248. }
  249. }
  250. // UpdateMilkNattach 非标准重复套杯
  251. func (e *Entry) UpdateMilkNattach(pastureId int64, milkClassConfig *MilkClassConfig, hall *model.MilkHall) {
  252. milkOriginalList := make([]*model.MilkOriginal, 0)
  253. if err := e.DB.Model(new(model.MilkOriginal)).
  254. Where("pasture_id = ?", pastureId).
  255. Where("id BETWEEN ? AND ?", milkClassConfig.OldUpdateMaxId+1, milkClassConfig.CurrentMaxId).
  256. Find(&milkOriginalList).Error; err != nil {
  257. zaplog.Error("DeleteRepeatMilkData", zap.Any("pastureId", pastureId), zap.Any("err", err))
  258. return
  259. }
  260. for _, v := range milkOriginalList {
  261. if v.InitialTime == "" || v.AttachTime == "" {
  262. continue
  263. }
  264. attachTime, _ := util.TimeParseLocal(model.LayoutTime, v.AttachTime)
  265. initialTime, _ := util.TimeParseLocal(model.LayoutTime, v.InitialTime)
  266. initialTimePlus1Min := initialTime.Add(time.Minute)
  267. if hall.Brand == v.MilkHallBrand && hall.Name == v.MilkHallNumber && v.Nattach == 0 &&
  268. !strings.HasSuffix(v.InitialTime, "00:00") && (attachTime.Before(initialTimePlus1Min) || attachTime.Equal(initialTimePlus1Min)) {
  269. if err := e.DB.Model(new(model.MilkOriginal)).
  270. Select("nattach").
  271. Where("id = ?", v.Id).
  272. Update("nattach", 2).Error; err != nil {
  273. zaplog.Error("UpdateMilkNattach", zap.Any("err", err))
  274. }
  275. }
  276. }
  277. }
  278. // UpdateMilkNoCowId 清理无牛号牛只:重复超过2圈牛号,套杯时间间隔大于17分钟
  279. func (e *Entry) UpdateMilkNoCowId(pastureId int64, milkClassConfig *MilkClassConfig, hall *model.MilkHall) {
  280. // SELECT m2.wid
  281. // FROM (SELECT m0.milkdate, m0.shifts, m0.detacher_address, m0.cow_id, MIN(m0.wid) wid, COUNT(0) nb, MIN(m0.attachtimes) attachtimes
  282. // FROM milkweight m0 WHERE m0.wid BETWEEN xdminwid AND xdmaxwid AND m0.milkdate=xcurdate AND m0.cow_id>0 AND m0.station=xvarName
  283. // GROUP BY m0.shifts, m0.detacher_address, m0.cow_id HAVING nb>1) m1
  284. // JOIN milkweight m2 ON m1.milkdate=m2.milkdate AND m1.shifts=m2.shifts AND m2.station=xvarName AND m1.detacher_address=m2.detacher_address
  285. // AND m1.cow_id=m2.cow_id AND m1.wid<m2.wid WHERE m2.wid BETWEEN xdminwid AND xdmaxwid AND m2.milkdate=xcurdate AND m2.cow_id>0
  286. // AND (m1.attachtimes + INTERVAL 17 MINUTE) < m2.attachtimes ;
  287. minMilkOriginalRecordsList := make([]*model.MinMilkOriginalRecords, 0)
  288. if err := e.DB.Model(new(model.MilkOriginal)).
  289. Select("MIN(id) AS min_id, COUNT(0) AS count, cow_id, detached_address, milk_date, MIN(attach_time) AS min_attach_time").
  290. Where("pasture_id = ?", pastureId).
  291. Where("id BETWEEN ? AND ?", milkClassConfig.OldUpdateMaxId+1, milkClassConfig.CurrentMaxId).
  292. Where("cow_id > ?", 0).
  293. Where("milk_hall_number = ?", hall.Name).
  294. Group("shifts, detached_address, cow_id").
  295. Having("count > ? AND min_attach_time != ?", 1, "").
  296. Find(&minMilkOriginalRecordsList).Error; err != nil {
  297. zaplog.Error("UpdateMilkNoCowId", zap.Any("err", err))
  298. return
  299. }
  300. // 2. 收集需要更新的 id 列表
  301. idsToUpdate := make([]int64, 0)
  302. for _, v := range minMilkOriginalRecordsList {
  303. var laterRecords []struct {
  304. Id int64
  305. }
  306. attachTime, _ := util.TimeParseLocal(model.LayoutTime, v.MinAttachTime)
  307. attachTimeAdd17 := attachTime.Add(time.Minute * 17).Format(model.LayoutTime)
  308. if err := e.DB.Model(new(model.MilkOriginal)).
  309. Select("id").
  310. Where("pasture_id = ?", pastureId).
  311. Where("id BETWEEN ? AND ?", milkClassConfig.OldUpdateMaxId+1, milkClassConfig.CurrentMaxId).
  312. Where("cow_id = ?", v.CowId).
  313. Where("detached_address = ?", v.DetachedAddress).
  314. Where("milk_date = ?", v.MilkDate).
  315. Where("attach_time > ?", attachTimeAdd17).Find(&laterRecords).Error; err != nil {
  316. zaplog.Error("UpdateMilkNoCowId", zap.Any("err", err))
  317. continue
  318. }
  319. if len(laterRecords) > 0 {
  320. idsToUpdate = append(idsToUpdate, v.MinId)
  321. }
  322. }
  323. if len(idsToUpdate) > 0 {
  324. if err := e.DB.Model(new(model.MilkOriginal)).
  325. Where("id IN ?", idsToUpdate).
  326. Updates(map[string]interface{}{
  327. "cow_id": 0,
  328. "ear_number": "",
  329. "pen_id": 0,
  330. "pen_name": "",
  331. }).Error; err != nil {
  332. zaplog.Error("UpdateMilkNoCowId", zap.Any("err", err))
  333. }
  334. }
  335. }
  336. // UpdateMilkLoad 设置批次 圈数
  337. func (e *Entry) UpdateMilkLoad(pastureId int64, milkClassConfig *MilkClassConfig, hall *model.MilkHall) {
  338. var (
  339. xAddress = 999
  340. //xShift = 6
  341. xLoad = int32(0)
  342. recognitionTime = time.Date(2001, 1, 1, 1, 1, 1, 0, time.Local)
  343. )
  344. baseRecords := make([]*model.BaseRecords, 0)
  345. if err := e.DB.Model(new(model.MilkOriginal)).
  346. Select("id,attach_adjust,detached_address,shifts,ear_number,milk_date,recognition_time").
  347. Where("pasture_id = ?", pastureId).
  348. Where("id BETWEEN ? AND ?", milkClassConfig.OldUpdateMaxId+1, milkClassConfig.CurrentMaxId).
  349. Where("nattach = ?", 0).Where("milk_hall_number = ?", hall.Name).
  350. Order("attach_adjust,detached_address").
  351. Limit(99).Find(&baseRecords).Error; err != nil {
  352. zaplog.Error("UpdateMilkLoad", zap.Any("err", err))
  353. return
  354. }
  355. // 3. 处理每条记录并计算 load 值
  356. recordsToUpdate := make([]*model.UpdateLoadRecord, 0)
  357. for _, record := range baseRecords {
  358. if record.RecognitionTime != "" && record.AttachAdjust != "" {
  359. r, _ := util.TimeParseLocal(model.LayoutTime, record.RecognitionTime)
  360. a, _ := util.TimeParseLocal(model.LayoutTime, record.AttachAdjust)
  361. diff := int32(a.Sub(r).Minutes())
  362. if !(diff >= 0 && diff <= 15 && !strings.HasSuffix(record.RecognitionTime, "00:00:00")) {
  363. record.RecognitionTime = record.AttachAdjust
  364. }
  365. }
  366. r, _ := util.TimeParseLocal(model.LayoutTime, record.RecognitionTime)
  367. /*var maxLoad int32
  368. if (record.DetachedAddress < xAddress-27 && r.After(recognitionTime.Add(-5*time.Minute))) || record.Shifts > xShift || r.After(recognitionTime.Add(6*time.Minute)) {
  369. xLoad++
  370. maxLoad = xLoad
  371. } else {
  372. maxLoad = xLoad
  373. }*/
  374. // 计算 currentLoad
  375. var currentLoad int32
  376. if (xAddress+60-record.DetachedAddress <= 27) &&
  377. r.Before(recognitionTime.Add(3*time.Minute)) {
  378. currentLoad = maxInt(xLoad-1, 1)
  379. } else {
  380. currentLoad = xLoad
  381. }
  382. // 更新地址
  383. if (xAddress+60-record.DetachedAddress <= 27) &&
  384. r.Before(recognitionTime.Add(3*time.Minute)) {
  385. // 保持原地址
  386. } else {
  387. xAddress = record.DetachedAddress
  388. }
  389. // 更新shift和时间
  390. //xShift = record.Shifts
  391. if r.After(recognitionTime) {
  392. recognitionTime = r
  393. }
  394. // 记录需要更新的数据
  395. recordsToUpdate = append(recordsToUpdate, &model.UpdateLoadRecord{
  396. Id: record.Id,
  397. Load: currentLoad,
  398. })
  399. }
  400. if len(recordsToUpdate) > 0 {
  401. batchSize := 100 // 每批更新100条
  402. for i := 0; i < len(recordsToUpdate); i += batchSize {
  403. end := i + batchSize
  404. if end > len(recordsToUpdate) {
  405. end = len(recordsToUpdate)
  406. }
  407. batch := recordsToUpdate[i:end]
  408. if err := e.DB.Model(new(model.MilkOriginal)).
  409. Where("id IN (?)", getWIDsFromUpdateRecords(batch)).
  410. Updates(map[string]interface{}{
  411. "load": gorm.Expr("CASE id " + buildCaseWhen(batch) + " END"),
  412. }).Error; err != nil {
  413. zaplog.Error("UpdateMilkLoad", zap.Any("err", err))
  414. continue
  415. }
  416. }
  417. }
  418. }
  419. // 辅助函数:从更新记录中提取WID列表
  420. func getWIDsFromUpdateRecords(records []*model.UpdateLoadRecord) []int64 {
  421. ids := make([]int64, len(records))
  422. for i, r := range records {
  423. ids[i] = r.Id
  424. }
  425. return ids
  426. }
  427. // 辅助函数:构建CASE WHEN语句
  428. func buildCaseWhen(records []*model.UpdateLoadRecord) string {
  429. var builder strings.Builder
  430. for _, r := range records {
  431. builder.WriteString(fmt.Sprintf("WHEN %d THEN %d ", r.Id, r.Load))
  432. }
  433. builder.WriteString("ELSE load")
  434. return builder.String()
  435. }
  436. // 辅助函数:返回两个整数中的最大值
  437. func maxInt(a, b int32) int32 {
  438. if a > b {
  439. return a
  440. }
  441. return b
  442. }