package crontab import ( "fmt" "kpt-pasture/model" "kpt-pasture/util" "sort" "strings" "time" "gorm.io/gorm" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "go.uber.org/zap" ) // UpdateRecognitionTime 识别时间超过40分钟未套杯牛只,识别改为未识别 func (e *Entry) UpdateRecognitionTime(pastureId int64, hall *model.MilkHall) { milkOriginalList := make([]*model.MilkOriginal, 0) if err := e.DB.Model(new(model.MilkOriginal)). Where("pasture_id = ?", pastureId). Where("milk_hall_number = ?", hall.Name). Where("milk_hall_brand = ?", hall.Brand). Where("load = ?", 0). Find(&milkOriginalList).Error; err != nil { zaplog.Error("MilkHallData", zap.Any("err", err)) } for _, v := range milkOriginalList { t1, _ := util.TimeParseLocal(model.LayoutTime, v.AttachTime) t2, _ := util.TimeParseLocal(model.LayoutTime, v.RecognitionTime) diff := t1.Sub(t2) minute := int(diff.Minutes()) if util.Substr(v.RecognitionTime, -1, 8) != "00:00:00" && minute > 40 { if err := e.DB.Model(new(model.MilkOriginal)). Where("id = ?", v.Id). Updates(map[string]interface{}{ "cow_id": 0, "ele_ear_number": "", "recognition_time": fmt.Sprintf("%s 00:00:00", util.Substr(v.RecognitionTime, 0, 10)), }).Error; err != nil { zaplog.Error("MilkHallData", zap.Any("err", err)) } } } } // UpdateRepeatCupSet1 更新重复套杯1, 识别时间相同,且不为0为重复套杯 func (e *Entry) UpdateRepeatCupSet1(milkOriginalList []*model.MilkOriginal) { if len(milkOriginalList) == 0 { return } milkOriginalMap := make(map[string][]*model.MilkOriginal) for _, v := range milkOriginalList { if strings.HasSuffix(v.RecognitionTime, "00:00:00") { continue } key := fmt.Sprintf("%s_%d_%d_%s", v.MilkDate, v.Shifts, v.DetachedAddress, v.RecognitionTime) milkOriginalMap[key] = append(milkOriginalMap[key], v) } for _, originalList := range milkOriginalMap { if len(originalList) >= 2 { // 按照Id升序排序(保留第一条) sort.Slice(originalList, func(i, j int) bool { return originalList[i].Id < originalList[j].Id }) for i, v := range originalList { if i == 0 { continue } if err := e.DB.Model(new(model.MilkOriginal)). Select("").Where("id = ?", v.Id). Update("nattach", 2).Error; err != nil { zaplog.Error("UpdateRepeatCupSet1", zap.Any("err", err)) } } } } } // UpdateMilkOriginCowInfo 更新牛只信息 func (e *Entry) UpdateMilkOriginCowInfo(milkOriginalList []*model.MilkOriginal, hall *model.MilkHall) { milkHallMap := make(map[string][]*model.MilkOriginal) for _, v := range milkOriginalList { key := fmt.Sprintf("%s", v.MilkHallNumber) milkHallMap[key] = append(milkHallMap[key], v) } dataList, ok := milkHallMap[hall.Name] if !ok { return } switch hall.IsExtraUpdate { case model.IsExtra0: case model.IsExtra1, model.IsExtra3: for _, d := range dataList { if d.EarNumber == "" { continue } cowInfo, err := e.GetCowByEarNumber(d.PastureId, d.EarNumber) if err != nil { zaplog.Error("UpdateMilkOriginCowInfo", zap.Any("err", err), zap.Any("data", d)) continue } // 更新牛只信息 d.UpdateCowInfo(cowInfo) if err = e.DB.Model(new(model.MilkOriginal)). Select("cow_id", "pen_id", "pen_name"). Where("id = ?", d.Id).Updates(d).Error; err != nil { zaplog.Error("UpdateMilkOriginCowInfo", zap.Any("err", err), zap.Any("data", d)) } } case model.IsExtra2: default: } } func (e *Entry) UpdateMilkOriginalInitialTimesAndAttachAdjustTime(shifts []int32, milkOriginalList []*model.MilkOriginal) { for _, shift := range shifts { shiftMinDetachTimes := "" // 按脱杯地址分组处理 addressMap := make(map[int64][]*model.MilkOriginal) for _, m := range milkOriginalList { if m.Shifts != shift || m.DetachedTime == "" { continue } if shiftMinDetachTimes == "" { shiftMinDetachTimes = m.DetachedTime } else { t1, _ := util.TimeParseLocal(model.LayoutTime, m.DetachedTime) t2, _ := util.TimeParseLocal(model.LayoutTime, shiftMinDetachTimes) if t2.Before(t1) { shiftMinDetachTimes = m.DetachedTime } } addressMap[m.DetachedAddress] = append(addressMap[m.DetachedAddress], m) } if shiftMinDetachTimes == "" { continue } bt, _ := util.TimeParseLocal(model.LayoutTime, shiftMinDetachTimes) b5 := bt.Add(-5*time.Minute).Format(model.LayoutHour) + "00:00" for _, list := range addressMap { // 对当前地址的记录按时间排序 sort.Slice(list, func(i, j int) bool { if list[i].MilkDate != list[j].MilkDate { return list[i].MilkDate < list[j].MilkDate } if list[i].Shifts != list[j].Shifts { return list[i].Shifts < list[j].Shifts } if list[i].DetachedAddress != list[j].DetachedAddress { return list[i].DetachedAddress < list[j].DetachedAddress } return list[i].Id < list[j].Id }) // 初始化变量,模拟SQL中的@address和@det var lastAddress int64 = 0 var lastDetachTime string = "2001-01-01 06:00:00" // 默认初始值 // 批量更新参数 var updateParams []struct { ID int64 InitialTimes string AttachAdjust string } for _, m := range list { var initialTimeStr string var attachAdjust string // 如果当前记录的脱杯地址与上一条不同,则使用基准时间b5 if m.DetachedAddress != lastAddress { initialTimeStr = b5 } else { // 否则使用上一条记录的脱杯时间 initialTimeStr = lastDetachTime } // 更新最后记录的地址和时间 lastAddress = m.DetachedAddress lastDetachTime = m.DetachedTime // 只有当initialTime不为空且与原有值不同时才需要更新 if initialTimeStr != "" { initialTime, _ := util.TimeParseLocal(model.LayoutTime, initialTimeStr) attachTime, _ := util.TimeParseLocal(model.LayoutTime, m.AttachTime) detachTime, _ := util.TimeParseLocal(model.LayoutTime, m.DetachedTime) // 条件1:attachtimes以'00:00:00'结尾或attachtimes <= initialtimes if strings.HasSuffix(m.AttachTime, "00:00:00") || attachTime.Before(initialTime) || attachTime.Equal(initialTime) { // 计算 detachtimes - (1.5 + duration)*60 秒 adjustTime1 := detachTime.Add(-time.Duration((90 + m.Duration*60)) * time.Second) // 取三者中的最大值 maxTime := util.FindMaxTime(attachTime, initialTime, adjustTime1) attachAdjust = maxTime.Format(model.LayoutTime) } else { // 计算 detachtimes - duration*60 秒 adjustTime2 := detachTime.Add(-time.Duration(m.Duration*60) * time.Second) // 取 attachtimes 和 adjustTime2 中的较小值 minTime := util.FindMinTime(attachTime, adjustTime2) // 再与 initialtimes 取较大值 maxTime := util.FindMaxTime(minTime, initialTime) attachAdjust = maxTime.Format(model.LayoutTime) } // 记录需要更新的字段 updateParams = append(updateParams, struct { ID int64 InitialTimes string AttachAdjust string }{ ID: m.Id, InitialTimes: initialTimeStr, AttachAdjust: attachAdjust, }) /*if err := e.DB.Model(new(model.MilkOriginal)). Select("initial_time"). Where("id = ?", m.Id). Update("initial_time", initialTime).Error; err != nil { zaplog.Error("UpdateMilkOriginalInitialTimesAndAttachAdjustTime", zap.Any("err", err)) }*/ } } if len(updateParams) > 0 { // 批量更新数据库 if err := e.DB.Transaction(func(tx *gorm.DB) error { for _, param := range updateParams { updates := map[string]interface{}{ "initial_times": param.InitialTimes, "attach_adjust_time": param.AttachAdjust, } if err := tx.Model(new(model.MilkOriginal)). Select("initial_time", "attach_adjust_time"). Where("id = ? ", param.ID). Updates(updates).Error; err != nil { return err } } return nil }); err != nil { zaplog.Error("UpdateMilkOriginalInitialTimesAndAttachAdjustTime", zap.Any("err", err)) } } } } } // UpdateRepeatCupSet2 非标准重复套杯 func (e *Entry) UpdateRepeatCupSet2(milkOriginalList []*model.MilkOriginal) { for _, v := range milkOriginalList { if v.AttachTime == "" || v.InitialTime == "" { continue } nattchTime, _ := util.TimeParseLocal(model.LayoutTime, v.AttachTime) initialTime, _ := util.TimeParseLocal(model.LayoutTime, v.InitialTime) if util.Substr(v.InitialTime, -1, 5) != "00:00" && v.Nattach == 0 && nattchTime.Sub(initialTime).Minutes() <= 1 { if err := e.DB.Model(new(model.MilkOriginal)). Select("nattach"). Where("id = ?", v.Id). Update("nattach", 2).Error; err != nil { zaplog.Error("UpdateRepeatCupSet2", zap.Any("err", err)) } } } } // UpdateMilkNattach 非标准重复套杯 func (e *Entry) UpdateMilkNattach(pastureId int64, milkClassConfig *MilkClassConfig, hall *model.MilkHall) { milkOriginalList := make([]*model.MilkOriginal, 0) if err := e.DB.Model(new(model.MilkOriginal)). Where("pasture_id = ?", pastureId). Where("id BETWEEN ? AND ?", milkClassConfig.OldUpdateMaxId+1, milkClassConfig.CurrentMaxId). Find(&milkOriginalList).Error; err != nil { zaplog.Error("DeleteRepeatMilkData", zap.Any("pastureId", pastureId), zap.Any("err", err)) return } for _, v := range milkOriginalList { if v.InitialTime == "" || v.AttachTime == "" { continue } attachTime, _ := util.TimeParseLocal(model.LayoutTime, v.AttachTime) initialTime, _ := util.TimeParseLocal(model.LayoutTime, v.InitialTime) initialTimePlus1Min := initialTime.Add(time.Minute) if hall.Brand == v.MilkHallBrand && hall.Name == v.MilkHallNumber && v.Nattach == 0 && !strings.HasSuffix(v.InitialTime, "00:00") && (attachTime.Before(initialTimePlus1Min) || attachTime.Equal(initialTimePlus1Min)) { if err := e.DB.Model(new(model.MilkOriginal)). Select("nattach"). Where("id = ?", v.Id). Update("nattach", 2).Error; err != nil { zaplog.Error("UpdateMilkNattach", zap.Any("err", err)) } } } } // UpdateMilkNoCowId 清理无牛号牛只:重复超过2圈牛号,套杯时间间隔大于17分钟 func (e *Entry) UpdateMilkNoCowId(pastureId int64, milkClassConfig *MilkClassConfig, hall *model.MilkHall) { // SELECT m2.wid // FROM (SELECT m0.milkdate, m0.shifts, m0.detacher_address, m0.cow_id, MIN(m0.wid) wid, COUNT(0) nb, MIN(m0.attachtimes) attachtimes // FROM milkweight m0 WHERE m0.wid BETWEEN xdminwid AND xdmaxwid AND m0.milkdate=xcurdate AND m0.cow_id>0 AND m0.station=xvarName // GROUP BY m0.shifts, m0.detacher_address, m0.cow_id HAVING nb>1) m1 // JOIN milkweight m2 ON m1.milkdate=m2.milkdate AND m1.shifts=m2.shifts AND m2.station=xvarName AND m1.detacher_address=m2.detacher_address // AND m1.cow_id=m2.cow_id AND m1.wid0 // AND (m1.attachtimes + INTERVAL 17 MINUTE) < m2.attachtimes ; minMilkOriginalRecordsList := make([]*model.MinMilkOriginalRecords, 0) if err := e.DB.Model(new(model.MilkOriginal)). Select("MIN(id) AS min_id, COUNT(0) AS count, cow_id, detached_address, milk_date, MIN(attach_time) AS min_attach_time"). Where("pasture_id = ?", pastureId). Where("id BETWEEN ? AND ?", milkClassConfig.OldUpdateMaxId+1, milkClassConfig.CurrentMaxId). Where("cow_id > ?", 0). Where("milk_hall_number = ?", hall.Name). Group("shifts, detached_address, cow_id"). Having("count > ? AND min_attach_time != ?", 1, ""). Find(&minMilkOriginalRecordsList).Error; err != nil { zaplog.Error("UpdateMilkNoCowId", zap.Any("err", err)) return } // 2. 收集需要更新的 id 列表 idsToUpdate := make([]int64, 0) for _, v := range minMilkOriginalRecordsList { var laterRecords []struct { Id int64 } attachTime, _ := util.TimeParseLocal(model.LayoutTime, v.MinAttachTime) attachTimeAdd17 := attachTime.Add(time.Minute * 17).Format(model.LayoutTime) if err := e.DB.Model(new(model.MilkOriginal)). Select("id"). Where("pasture_id = ?", pastureId). Where("id BETWEEN ? AND ?", milkClassConfig.OldUpdateMaxId+1, milkClassConfig.CurrentMaxId). Where("cow_id = ?", v.CowId). Where("detached_address = ?", v.DetachedAddress). Where("milk_date = ?", v.MilkDate). Where("attach_time > ?", attachTimeAdd17).Find(&laterRecords).Error; err != nil { zaplog.Error("UpdateMilkNoCowId", zap.Any("err", err)) continue } if len(laterRecords) > 0 { idsToUpdate = append(idsToUpdate, v.MinId) } } if len(idsToUpdate) > 0 { if err := e.DB.Model(new(model.MilkOriginal)). Where("id IN ?", idsToUpdate). Updates(map[string]interface{}{ "cow_id": 0, "ear_number": "", "pen_id": 0, "pen_name": "", }).Error; err != nil { zaplog.Error("UpdateMilkNoCowId", zap.Any("err", err)) } } } // UpdateMilkLoad 设置批次 圈数 func (e *Entry) UpdateMilkLoad(pastureId int64, milkClassConfig *MilkClassConfig, hall *model.MilkHall) { var ( xAddress = int32(999) //xShift = 6 xLoad = int32(0) recognitionTime = time.Date(2001, 1, 1, 1, 1, 1, 0, time.Local) ) baseRecords := make([]*model.BaseRecords, 0) if err := e.DB.Model(new(model.MilkOriginal)). Select("id,attach_adjust,detached_address,shifts,ear_number,milk_date,recognition_time"). Where("pasture_id = ?", pastureId). Where("id BETWEEN ? AND ?", milkClassConfig.OldUpdateMaxId+1, milkClassConfig.CurrentMaxId). Where("nattach = ?", 0).Where("milk_hall_number = ?", hall.Name). Order("attach_adjust,detached_address"). Limit(99).Find(&baseRecords).Error; err != nil { zaplog.Error("UpdateMilkLoad", zap.Any("err", err)) return } // 3. 处理每条记录并计算 load 值 recordsToUpdate := make([]*model.UpdateLoadRecord, 0) for _, record := range baseRecords { if record.RecognitionTime != "" && record.AttachAdjust != "" { r, _ := util.TimeParseLocal(model.LayoutTime, record.RecognitionTime) a, _ := util.TimeParseLocal(model.LayoutTime, record.AttachAdjust) diff := int32(a.Sub(r).Minutes()) if !(diff >= 0 && diff <= 15 && !strings.HasSuffix(record.RecognitionTime, "00:00:00")) { record.RecognitionTime = record.AttachAdjust } } r, _ := util.TimeParseLocal(model.LayoutTime, record.RecognitionTime) /*var maxLoad int32 if (record.DetachedAddress < xAddress-27 && r.After(recognitionTime.Add(-5*time.Minute))) || record.Shifts > xShift || r.After(recognitionTime.Add(6*time.Minute)) { xLoad++ maxLoad = xLoad } else { maxLoad = xLoad }*/ // 计算 currentLoad var currentLoad int32 if (xAddress+60-record.DetachedAddress <= 27) && r.Before(recognitionTime.Add(3*time.Minute)) { currentLoad = maxInt(xLoad-1, 1) } else { currentLoad = xLoad } // 更新地址 if (xAddress+60-record.DetachedAddress <= 27) && r.Before(recognitionTime.Add(3*time.Minute)) { // 保持原地址 } else { xAddress = record.DetachedAddress } // 更新shift和时间 //xShift = record.Shifts if r.After(recognitionTime) { recognitionTime = r } // 记录需要更新的数据 recordsToUpdate = append(recordsToUpdate, &model.UpdateLoadRecord{ Id: record.Id, Load: currentLoad, }) } if len(recordsToUpdate) > 0 { batchSize := 100 // 每批更新100条 for i := 0; i < len(recordsToUpdate); i += batchSize { end := i + batchSize if end > len(recordsToUpdate) { end = len(recordsToUpdate) } batch := recordsToUpdate[i:end] if err := e.DB.Model(new(model.MilkOriginal)). Where("id IN (?)", getWIDsFromUpdateRecords(batch)). Updates(map[string]interface{}{ "load": gorm.Expr("CASE id " + buildCaseWhen(batch) + " END"), }).Error; err != nil { zaplog.Error("UpdateMilkLoad", zap.Any("err", err)) continue } } } } // 辅助函数:从更新记录中提取WID列表 func getWIDsFromUpdateRecords(records []*model.UpdateLoadRecord) []int64 { ids := make([]int64, len(records)) for i, r := range records { ids[i] = r.Id } return ids } // 辅助函数:构建CASE WHEN语句 func buildCaseWhen(records []*model.UpdateLoadRecord) string { var builder strings.Builder for _, r := range records { builder.WriteString(fmt.Sprintf("WHEN %d THEN %d ", r.Id, r.Load)) } builder.WriteString("ELSE load") return builder.String() } // 辅助函数:返回两个整数中的最大值 func maxInt(a, b int32) int32 { if a > b { return a } return b }