package crontab import ( "fmt" "kpt-pasture/model" "kpt-pasture/util" "sort" "time" pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "gitee.com/xuyiping_admin/pkg/xerr" "go.uber.org/zap" ) // NeckRingEstrusWarning 脖环发情预警 func (e *Entry) NeckRingEstrusWarning() (err error) { pastureList := e.FindPastureList() if pastureList == nil || len(pastureList) == 0 { return nil } for _, pasture := range pastureList { if err = e.UpdateNeckRingWarning(pasture.Id); err != nil { zaplog.Error("UpdateNeckRingWarning", zap.Any("NeckRingEstrusWarning", err), zap.Any("pasture", pasture)) } } return nil } func (e *Entry) UpdateNeckRingWarning(pastureId int64) (err error) { // 先删除历史数据 if err = e.DB.Model(new(model.NeckRingEstrusWarning)). Where("pasture_id = ?", pastureId). Delete(new(model.NeckRingEstrusWarning)).Error; err != nil { return xerr.WithStack(err) } // 计算时间范围 now := time.Now() startTime := now.Add(-24 * time.Hour) neckRingEstrusList := make([]*model.NeckRingEstrus, 0) if err = e.DB.Table(fmt.Sprintf("%s as a", new(model.NeckRingEstrus).TableName())). Select("a.*"). Joins("JOIN cow as b ON a.cow_id = b.id AND a.pasture_id = b.pasture_id"). Where("a.pasture_id = ?", pastureId). Where("a.active_time >= ?", startTime.Format(model.LayoutTime)). Where("a.active_level >= ?", pasturePb.EstrusLevel_Low). Where("a.check_result IN (?)", []pasturePb.CheckResult_Kind{pasturePb.CheckResult_Pending, pasturePb.CheckResult_Correct}). Where("a.is_show = ?", pasturePb.IsShow_Ok). Where("a.is_peak >= ?", pasturePb.IsShow_Ok). Where("b.admission_status = ?", pasturePb.AdmissionStatus_Admission). Find(&neckRingEstrusList).Error; err != nil { return xerr.WithStack(err) } zaplog.Info("UpdateNeckRingWarning", zap.Any("neckRingEstrusList", neckRingEstrusList)) if len(neckRingEstrusList) == 0 { return nil } neckRingEstrusWarningList := e.GroupAndProcessData(neckRingEstrusList) zaplog.Info("UpdateNeckRingWarning", zap.Any("neckRingEstrusWarningList", neckRingEstrusWarningList)) if len(neckRingEstrusWarningList) > 0 { if err = e.DB.CreateInBatches(neckRingEstrusWarningList, 50).Error; err != nil { return xerr.WithStack(err) } } else { return nil } minId := e.getMinId(pastureId) // 更新HighChange字段 // e.UpdateHighChange(pastureId,minId) e.UpdateNeckRingWarningIsPeak(pastureId, minId) return nil } func (e *Entry) UpdateNeckRingWarningIsPeak(pastureId, minId int64) { sqlQuery := e.DB.Table(fmt.Sprintf("%s as a", new(model.NeckActiveHabit).TableName())). Select("1"). Where("a.id >= ?", minId). Where("a.cow_id = b.cow_id"). Where("a.created_at > UNIX_TIMESTAMP(b.date_time)") if err := e.DB.Table(fmt.Sprintf("%s as b", new(model.NeckRingEstrusWarning).TableName())). Where("b.pasture_id = ?", pastureId). Where("EXISTS (?)", sqlQuery).Update("is_peak", pasturePb.IsShow_Ok).Error; err != nil { zaplog.Error("UpdateNeckRingWarningIsPeak", zap.Any("err", err)) } } func (e *Entry) UpdateHighChange(pastureId, minId int64) { estrusWarningList, err := e.GetCowHighChange(pastureId, minId) if err != nil { zaplog.Error("UpdateHighChange", zap.Any("err", err)) return } zaplog.Info("UpdateHighChange", zap.Any("estrusWarningList", estrusWarningList)) for _, v := range estrusWarningList { neckRingEstrusWarning := &model.NeckRingEstrusWarning{} if err = e.DB.Model(new(model.NeckRingEstrusWarning)). Where("neck_ring_estrus_id = ?", v.NeckRingEstrusId). Find(neckRingEstrusWarning).Error; err != nil { zaplog.Error("UpdateHighChange", zap.Any("Find", err), zap.Any("v", v)) continue } if v.Nb1 <= model.MinNb1 { count := e.getCowHigh(pastureId, v.CowId, minId, v.DateTime) if count <= 0 { if err = e.DB.Model(new(model.NeckRingEstrusWarning)). Where("neck_ring_estrus_id = ?", v.NeckRingEstrusId). Where("cow_id = ?", v.CowId). Where("pasture_id = ?", pastureId).Delete(new(model.NeckRingEstrusWarning)).Error; err != nil { zaplog.Error("UpdateHighChange", zap.Any("Delete", err), zap.Any("v", v)) } continue } } if err = e.DB.Model(new(model.NeckRingEstrusWarning)). Where("neck_ring_estrus_id = ?", v.NeckRingEstrusId). Updates(map[string]interface{}{ "warning_kind": pasturePb.Warning_Estrus, "high_change": v.HighChange, }).Error; err != nil { zaplog.Error("UpdateHighChange", zap.Any("Updates", err), zap.Any("v", v)) continue } } } func (e *Entry) GroupAndProcessData(records []*model.NeckRingEstrus) []*model.NeckRingEstrusWarning { groups := make(map[int64]*model.GroupEstrusData) // 分组处理 for _, record := range records { // 从关联的Cow表获取信息(这里需要根据实际关联方式调整) // 假设已通过JOIN获取到CowID等信息 // 实际实现可能需要预加载Cow信息 key := record.CowId if _, exist := groups[key]; !exist { groups[key] = &model.GroupEstrusData{ CowId: record.CowId, PastureId: record.PastureId, Records: make([]*model.NeckRingEstrus, 0), EarNumber: record.EarNumber, NeckRingNumber: record.NeckRingNumber, Moved: false, } } // 检查是否在移牛列表中 // 假设通过CowID判断,需要根据实际关联关系调整 /*if _, moved := movedCows[record.CowId]; moved { groups[key].Moved = true }*/ groups[key].Records = append(groups[key].Records, record) } // 处理每个分组 var neckRingEstrusWarningList []*model.NeckRingEstrusWarning for _, group := range groups { if len(group.Records) == 0 { continue } // 排序记录 sort.Slice(group.Records, func(i, j int) bool { return group.Records[i].Id > group.Records[j].Id }) // 计算字段 latest := group.Records[0] maxId := findMaxId(group.Records) firstTime := findMaxTime(group.Records, func(r *model.NeckRingEstrus) string { return r.FirstTime }) dateTime := findMaxTime(group.Records, func(r *model.NeckRingEstrus) string { return r.ActiveTime }) neckRingEstrusWarning := model.NewNeckRingEstrusWarning( maxId, group.PastureId, group.CowId, group.EarNumber, group.NeckRingNumber, firstTime, dateTime, latest.LastTime, pasturePb.Warning_Estrus, latest.ActiveLevel, ) neckRingEstrusWarningList = append(neckRingEstrusWarningList, neckRingEstrusWarning) } return neckRingEstrusWarningList } func (e *Entry) GetCowHighChange(pastureId, minId int64) ([]*model.EstrusWarning, error) { nowTime := time.Now().Add(-48 * time.Hour) estrusWarningList := make([]*model.EstrusWarning, 0) if err := e.DB.Table(fmt.Sprintf("%s as a", new(model.NeckActiveHabit).TableName())). Select( "GROUP_CONCAT(IF(ROUND(a.change_filter*a.filter_correct/100,0)=-99,'',ROUND(a.change_filter*a.filter_correct/100,0)) ) as high_change", "b.neck_ring_estrus_id", "b.cow_id", "b.date_time", "COUNT(a.change_filter>-99) as nb1", "COUNT(a.change_filter=-99 AND a.created_at>=(STR_TO_DATE(b.date_time,'%Y-%m-%d %H:%i:%s') -INTERVAL 48 HOUR )) as nb2"). Joins("JOIN neck_ring_estrus_warning as b ON a.cow_id = b.cow_id AND a.pasture_id = b.pasture_id"). Where("a.id > ?", minId). Where("a.pasture_id = ?", pastureId). Where("a.created_at > ?", nowTime.Unix()). Group("b.neck_ring_estrus_id"). Having("nb1 <= ? AND nb2 >= ?", model.Nb1, model.Nb2). Find(&estrusWarningList). Error; err != nil { return nil, xerr.WithStack(err) } return estrusWarningList, nil } // getRecentMovedCows 辅助函数:检查是否在移牛事件 func (e *Entry) getRecentMovedCows(pastureId, cowId int64) bool { var count int64 startDate := time.Now().AddDate(0, 0, -2) table := &model.EventCowLog{CowId: cowId} if err := e.DB.Table(table.TableName()). Where("cow_id = ?", cowId). Where("pasture = ?", pastureId). Where("event_type = ?", pasturePb.EventType_Transfer_Ben). Where("event_at >= ?", startDate.Unix()). Count(&count).Error; err != nil { return false } if count > 0 { return true } return false } func (e *Entry) getMinId(pastureId int64) int64 { var minId int64 nowTime := time.Now().AddDate(0, 0, -2).Format(model.LayoutDate2) if err := e.DB.Model(new(model.NeckActiveHabit)). Select("MIN(id) as id"). Where("heat_date = ?", nowTime). Where("pasture_id = ?", pastureId). Scan(&minId).Error; err != nil { return 1 } return minId } func (e *Entry) getCowHigh(pastureId, cowId, minId int64, dateTime string) int64 { dateTimeUnix, _ := util.TimeParseLocal(model.LayoutTime, dateTime) dateTimeUnixStart := time.Time{} if dateTimeUnix.IsZero() { dateTimeUnixStart = time.Now().Add(-22 * time.Hour) } else { dateTimeUnixStart = dateTimeUnix.Add(-22 * time.Hour) } var count int64 if err := e.DB.Model(new(model.NeckActiveHabit)). Select("COUNT(0) as count"). Where("created_at BETWEEN ? AND ?", dateTimeUnixStart.Unix(), dateTimeUnix). Where("pasture_id = ?", pastureId). Where("cow_id = ?", cowId). Where("id >= ?", minId). Scan(&count).Error; err != nil { return 0 } return count } // 辅助函数:计算最大时间 func findMaxTime(records []*model.NeckRingEstrus, getter func(*model.NeckRingEstrus) string) string { var max time.Time for _, r := range records { t1 := getter(r) if t1 == "" { continue } t, err := util.TimeParseLocal(model.LayoutTime, t1) if err != nil { continue } if t.After(max) { max = t } } if max.IsZero() { return "" } return max.Format(model.LayoutTime) } func findMaxId(records []*model.NeckRingEstrus) int64 { maxId := int64(0) for _, v := range records { if v.Id > maxId { maxId = v.Id } } return maxId }