浏览代码

cow: growthCurve update

Yi 2 周之前
父节点
当前提交
273ac411c7
共有 5 个文件被更改,包括 160 次插入153 次删除
  1. 0 4
      model/neck_active_habit.go
  2. 0 15
      model/pen_behavior.go
  3. 74 1
      module/backend/cow.go
  4. 72 133
      module/crontab/pen_behavior.go
  5. 14 0
      util/util.go

+ 0 - 4
model/neck_active_habit.go

@@ -311,10 +311,6 @@ type WeeklyActiveModel struct {
 	High     int32  `json:"high"`
 }
 
-// 		Select(`h.cow_id, h.ear_number,
-//		SUM(IF(TIMESTAMPDIFF(HOUR, UNIX_TIMESTAMP(h.active_time), h.created_at)>9, 1, 0)) AS nb,
-//		COUNT(1) AS nba, ROUND(AVG(h.voltage), 0) AS voltage`).
-
 type NeckRingErrorModel struct {
 	CowId   int64
 	Nb      int32

+ 0 - 15
model/pen_behavior.go

@@ -129,21 +129,6 @@ func (p PenBehaviorSlice) ToPB() *pasturePb.BarnBehaviorCurveItem {
 	return res
 }
 
-type PenBehaviorModel struct {
-	Id         int64
-	PastureId  int64
-	ActiveDate string
-	PenId      int32
-	PenName    string
-	Frameid    int32
-	Rumina     int32
-	Intake     int32
-	Inactive   int32
-	Gasp       int32
-	High       int32
-	Active     int32
-}
-
 type PenBehaviorData struct {
 	PastureId  int64  `json:"pastureId"`
 	PenId      int32  `json:"penId"`

+ 74 - 1
module/backend/cow.go

@@ -416,7 +416,7 @@ func (s *StoreEntry) CowIQR(ctx context.Context, cowInfo *model.Cow, curveName s
 	return q1, q3
 }
 
-func (s *StoreEntry) GetNeckActiveHabitsConcurrent(cowInfo *model.Cow, cowIds []int64, dayRange []string) ([]*model.NeckActiveHabit, error) {
+func (s *StoreEntry) GetNeckActiveHabitsConcurrent2(cowInfo *model.Cow, cowIds []int64, dayRange []string) ([]*model.NeckActiveHabit, error) {
 	var neckActiveHabitList []*model.NeckActiveHabit
 	var batchSize = 100
 	var wg sync.WaitGroup
@@ -466,6 +466,79 @@ func (s *StoreEntry) GetNeckActiveHabitsConcurrent(cowInfo *model.Cow, cowIds []
 	return neckActiveHabitList, nil
 }
 
+func (s *StoreEntry) GetNeckActiveHabitsConcurrent(cowInfo *model.Cow, cowIds []int64, dayRange []string) ([]*model.NeckActiveHabit, error) {
+	const (
+		batchSize   = 100
+		workerCount = 5 // 并发数,可根据实际情况调整
+	)
+
+	// 结果收集通道
+	resultsChan := make(chan []*model.NeckActiveHabit, len(cowIds)/batchSize+1)
+	errChan := make(chan error, 1)
+	doneChan := make(chan struct{})
+
+	// 启动结果收集goroutine
+	var neckActiveHabitList []*model.NeckActiveHabit
+	go func() {
+		defer close(doneChan)
+		for batchResults := range resultsChan {
+			neckActiveHabitList = append(neckActiveHabitList, batchResults...)
+		}
+	}()
+
+	// 工作池
+	sem := make(chan struct{}, workerCount)
+
+	// 分发任务
+	go func() {
+		defer close(resultsChan)
+		defer close(errChan)
+
+		for i := 0; i < len(cowIds); i += batchSize {
+			end := i + batchSize
+			if end > len(cowIds) {
+				end = len(cowIds)
+			}
+			batch := cowIds[i:end]
+
+			sem <- struct{}{} // 获取令牌
+
+			go func(batch []int64) {
+				defer func() { <-sem }() // 释放令牌
+
+				var batchResults []*model.NeckActiveHabit
+				if err := s.DB.Table(new(model.NeckActiveHabit).TableName()).
+					Select("rumina,intake,inactive,gasp,high,active,active_time").
+					Where("pasture_id = ?", cowInfo.PastureId).
+					Where("is_show = ?", pasturePb.IsShow_Ok).
+					Where("cow_id IN (?)", batch).
+					Where("heat_date BETWEEN ? AND ?", dayRange[0], dayRange[len(dayRange)-1]).
+					Find(&batchResults).Error; err != nil {
+					select {
+					case errChan <- fmt.Errorf("batch query error: %v", err):
+					default: // 避免阻塞,如果已经有错误了
+					}
+					return
+				}
+
+				resultsChan <- batchResults
+			}(batch)
+		}
+
+		// 等待所有worker完成
+		for i := 0; i < cap(sem); i++ {
+			sem <- struct{}{}
+		}
+	}()
+
+	// 等待结果
+	select {
+	case err := <-errChan:
+		return nil, err
+	case <-doneChan:
+		return neckActiveHabitList, nil
+	}
+}
 func (s *StoreEntry) CowGrowthCurve(ctx context.Context, req *pasturePb.CowGrowthCurveRequest) (*pasturePb.CowGrowthCurveResponse, error) {
 	userModel, err := s.GetUserModel(ctx)
 	if err != nil {

+ 72 - 133
module/crontab/pen_behavior.go

@@ -3,10 +3,13 @@ package crontab
 import (
 	"fmt"
 	"kpt-pasture/model"
+	"kpt-pasture/util"
 	"math"
 	"sort"
 	"time"
 
+	"gitee.com/xuyiping_admin/pkg/xerr"
+
 	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
@@ -20,142 +23,85 @@ func (e *Entry) UpdatePenBehavior() error {
 	}
 
 	for _, pasture := range pastureList {
-		conf, err := e.GetSystemNeckRingConfigure(pasture.Id, model.MaxPenBehavior)
-		if err != nil {
-			zaplog.Error("UpdatePenBehavior", zap.Any("pasture", pasture), zap.Any("err", err))
-			continue
-		}
-		e.PenBehavior(pasture.Id, conf.Value)
+		e.PenBehavior(pasture.Id)
 		e.UpdatePenBehaviorWeekData(pasture.Id)
 	}
 	return nil
 }
 
 // PenBehavior 栏舍行为曲线
-func (e *Entry) PenBehavior(pastureId, maxPenBehavior int64) {
-	// 1. 获取颈环原始数据
-	penBehaviorModelList, err := e.getNeckRingOriginalList(pastureId, maxPenBehavior)
-	if err != nil {
-		zaplog.Error("PenBehavior",
-			zap.Any("pastureId", pastureId),
-			zap.Any("maxPenBehavior", maxPenBehavior),
-			zap.Any("err", err),
-		)
-		return
+func (e *Entry) PenBehavior(pastureId int64) {
+	heatDate := time.Now().Local().AddDate(0, 0, -1).Format(model.LayoutDate2)
+	frameIds := util.FrameIdSlice
+
+	penBehaviorList := make([]*model.PenBehaviorData, 0)
+	for _, frameId := range frameIds {
+		// 1. 获取颈环原始数据
+		penBehaviorModel, err := e.getNeckRingOriginalList(pastureId, heatDate, frameId)
+		if err != nil {
+			zaplog.Error("PenBehavior",
+				zap.Any("pasture", pastureId),
+				zap.Any("frameId", frameId),
+				zap.Any("heatDate", heatDate),
+				zap.Any("err", err),
+			)
+			continue
+		}
+		if penBehaviorModel != nil {
+			penBehaviorList = append(penBehaviorList, penBehaviorModel)
+		}
 	}
 
-	if len(penBehaviorModelList) <= 0 {
+	if len(penBehaviorList) <= 0 {
 		return
 	}
 
-	// 2. 处理栏舍行为数据
-	penData := e.processPenBehaviorData(penBehaviorModelList)
-
-	// 3. 计算平均值和百分比
-	e.calculateAveragesAndRates(penData)
-
-	// 4. 保存数据
-	if err = e.savePenBehaviorData(penData); err != nil {
-		zaplog.Error("PenBehavior", zap.Any("penData", penData), zap.Any("err", err))
+	// 2. 保存数据
+	if err := e.savePenBehaviorData(penBehaviorList); err != nil {
+		zaplog.Error("PenBehavior", zap.Any("penBehaviorList", penBehaviorList), zap.Any("err", err))
 		return
 	}
-
-	sort.Slice(penBehaviorModelList, func(i, j int) bool {
-		return penBehaviorModelList[i].Id > penBehaviorModelList[j].Id
-	})
-	if err = e.UpdateSystemNeckRingConfigure(pastureId, model.MaxPenBehavior, penBehaviorModelList[0].Id); err != nil {
-		zaplog.Error("PenBehavior", zap.Any("MaxPenBehavior", err), zap.Any("penBehaviorModelList", penBehaviorModelList))
-	}
 }
 
 // getNeckRingOriginalList 获取颈环原始数据
-func (e *Entry) getNeckRingOriginalList(pastureId, maxPenBehavior int64) ([]*model.PenBehaviorModel, error) {
-	var penBehaviorModelList []*model.PenBehaviorModel
-	if err := e.DB.Table(fmt.Sprintf("%s as h", new(model.NeckRingOriginal).TableName())).
-		Joins("INNER JOIN cow as c ON h.pasture_id = c.pasture_id AND h.neck_ring_number = c.neck_ring_number").
-		Select("h.id,c.pasture_id, c.pen_id, c.pen_name, h.active_date, h.frameid, h.high, h.rumina, h.intake, h.inactive, h.gasp").
-		Where("h.id > ? AND h.pasture_id = ?", maxPenBehavior, pastureId).
-		Order("h.active_date,h.frameid").
-		Limit(int(defaultLimit)).
-		Find(&penBehaviorModelList).Error; err != nil {
-		return nil, err
-	}
-	return penBehaviorModelList, nil
-}
-
-// processPenBehaviorData 处理栏舍行为数据
-func (e *Entry) processPenBehaviorData(penBehaviorModelList []*model.PenBehaviorModel) map[string]*model.PenBehaviorData {
-	// 按active_date和frameid分组
-	activeDateFrameIdMap := make(map[string][]*model.PenBehaviorModel)
-	for _, v := range penBehaviorModelList {
-		key := fmt.Sprintf("%s_%d", v.ActiveDate, v.Frameid)
-		if activeDateFrameIdMap[key] == nil {
-			activeDateFrameIdMap[key] = make([]*model.PenBehaviorModel, 0)
-		}
-		activeDateFrameIdMap[key] = append(activeDateFrameIdMap[key], v)
-	}
-
-	// 按pen_id分组统计
-	penData := make(map[string]*model.PenBehaviorData)
-	for _, v := range activeDateFrameIdMap {
-		// 按pen_id分组
-		penIdMap := make(map[int32]*model.PenBehaviorData)
-		for _, item := range v {
-			if data, exists := penIdMap[item.PenId]; exists {
-				// 更新计数
-				data.CowCount++
-				// 更新平均值
-				data.AvgHigh += item.High
-				// 更新行为统计
-				data.SumRumina += ifThenElse(item.Rumina >= 8, 1, 0)
-				data.SumIntake += ifThenElse(item.Intake >= 8, 1, 0)
-				data.SumRest += ifThenElse(item.Inactive >= 8, 1, 0)
-				data.SumGasp += ifThenElse(item.Gasp >= 8, 1, 0)
-			} else {
-				penIdMap[item.PenId] = &model.PenBehaviorData{
-					PastureId: item.PastureId,
-					PenId:     item.PenId,
-					PenName:   item.PenName,
-					HeatDate:  item.ActiveDate,
-					Frameid:   item.Frameid,
-					CowCount:  1,
-					AvgHigh:   item.High,
-					SumRumina: ifThenElse(item.Rumina >= 8, 1, 0),
-					SumIntake: ifThenElse(item.Intake >= 8, 1, 0),
-					SumRest:   ifThenElse(item.Inactive >= 8, 1, 0),
-					SumGasp:   ifThenElse(item.Gasp >= 8, 1, 0),
-				}
-			}
-		}
-
-		// 将penIdMap的数据合并到penData中
-		for penId, data := range penIdMap {
-			key := fmt.Sprintf("%s_%d_%d", data.HeatDate, penId, data.Frameid)
-			penData[key] = data
-		}
-	}
-
-	return penData
-}
-
-// calculateAveragesAndRates 计算平均值和百分比
-func (e *Entry) calculateAveragesAndRates(penData map[string]*model.PenBehaviorData) {
-	for _, data := range penData {
-		// 计算平均值
-		data.AvgHigh = data.AvgHigh / data.CowCount
-		// 计算百分比
-		if data.CowCount > 0 {
-			data.RuminaRate = int32(float64(data.SumRumina) / float64(data.CowCount) * 100)
-			data.IntakeRate = int32(float64(data.SumIntake) / float64(data.CowCount) * 100)
-			data.RestRate = int32(float64(data.SumRest) / float64(data.CowCount) * 100)
-			data.GaspRate = int32(float64(data.SumGasp) / float64(data.CowCount) * 100)
-		}
+func (e *Entry) getNeckRingOriginalList(pastureId int64, dateTime string, frameId int32) (*model.PenBehaviorData, error) {
+	penBehaviorModel := &model.PenBehaviorData{}
+	sql := fmt.Sprintf(`
+		SELECT	bb.pasture_id, bb.heat_date, bb.frameid,
+		bb.pen_id, bb.pen_name,bb.cow_count, bb.avg_high, 
+		bb.sum_rumina, bb.sum_intake, bb.sum_rest, bb.sum_gasp, 
+		ROUND(bb.sum_rumina/bb.cow_count*100, 0) rumina_rate , 
+		ROUND(bb.sum_intake/bb.cow_count*100, 0) intake_rate, 
+		ROUND(bb.sum_rest/bb.cow_count*100, 0) rest_rate, 
+		ROUND(bb.sum_gasp/bb.cow_count*100, 0) gasp_rate
+		FROM (
+			SELECT aa.pasture_id, aa.pen_id, aa.pen_name, aa.heat_date, aa.frameid, 
+			COUNT(1) cow_count, 
+			ROUND(AVG(aa.high), 0) avg_high, 
+			SUM(IF(aa.rumina>=8, 1, 0)) sum_rumina, 
+			SUM(IF(aa.intake>=8, 1, 0)) sum_intake,  
+			SUM(IF(aa.inactive>=8, 1, 0)) sum_rest,  
+			SUM(IF(aa.gasp>=8, 1, 0) ) sum_gasp  
+			FROM (
+				SELECT c.pasture_id, c.ear_number, c.pen_id, c.pen_name, h.neck_ring_number, h.active_date as heat_date, 
+				h.frameid, h.high, h.rumina, h.intake, h.inactive, h.gasp 
+				FROM neck_ring_original h JOIN cow c ON h.pasture_id=c.pasture_id AND h.neck_ring_number=c.neck_ring_number 
+				WHERE h.pasture_id = %d
+				AND h.active_date='%s' 
+				AND h.frameid = %d
+				GROUP BY h.neck_ring_number
+			) aa GROUP BY aa.pen_id 
+		) bb`, pastureId, dateTime, frameId)
+
+	if err := e.DB.Raw(sql).First(penBehaviorModel).Error; err != nil {
+		return nil, xerr.WithStack(err)
 	}
+	return penBehaviorModel, nil
 }
 
 // savePenBehaviorData 保存栏舍行为数据
-func (e *Entry) savePenBehaviorData(penData map[string]*model.PenBehaviorData) error {
-	for _, data := range penData {
+func (e *Entry) savePenBehaviorData(penDataList []*model.PenBehaviorData) error {
+	for _, data := range penDataList {
 		// 构建活动时间
 		activeTime := e.calculateActiveTime(data.HeatDate, data.Frameid)
 		// 构建保存数据
@@ -165,27 +111,20 @@ func (e *Entry) savePenBehaviorData(penData map[string]*model.PenBehaviorData) e
 			if historyData == nil || historyData.Id <= 0 {
 				continue
 			}
-			// 计算新的总和和平均值
-			newCowCount := historyData.CowCount + penBehavior.CowCount
-			newAvgHigh := (historyData.AvgHigh*historyData.CowCount + penBehavior.AvgHigh*penBehavior.CowCount) / newCowCount
-			newSumRumina := historyData.SumRumina + penBehavior.SumRumina
-			newSumIntake := historyData.SumIntake + penBehavior.SumIntake
-			newSumRest := historyData.SumRest + penBehavior.SumRest
-			newSumGasp := historyData.SumGasp + penBehavior.SumGasp
 
 			if err := e.DB.Model(new(model.PenBehavior)).
 				Where("id = ?", historyData.Id).
 				Updates(map[string]interface{}{
-					"cow_count":   newCowCount,
-					"avg_high":    newAvgHigh,
-					"sum_rumina":  newSumRumina,
-					"sum_intake":  newSumIntake,
-					"sum_rest":    newSumRest,
-					"sum_gasp":    newSumGasp,
-					"rumina_rate": int32(float64(newSumRumina) / float64(newCowCount) * 100),
-					"intake_rate": int32(float64(newSumIntake) / float64(newCowCount) * 100),
-					"rest_rate":   int32(float64(newSumRest) / float64(newCowCount) * 100),
-					"gasp_rate":   int32(float64(newSumGasp) / float64(newCowCount) * 100),
+					"cow_count":   data.CowCount,
+					"avg_high":    data.AvgHigh,
+					"sum_rumina":  data.SumRumina,
+					"sum_intake":  data.SumIntake,
+					"sum_rest":    data.SumRest,
+					"sum_gasp":    data.SumGasp,
+					"rumina_rate": data.RuminaRate,
+					"intake_rate": data.IntakeRate,
+					"rest_rate":   data.RestRate,
+					"gasp_rate":   data.GaspRate,
 				}).Error; err != nil {
 				zaplog.Error("savePenBehaviorData", zap.Any("penBehavior", penBehavior), zap.Any("err", err))
 			}

+ 14 - 0
util/util.go

@@ -63,6 +63,20 @@ var (
 		23: 118,
 	}
 	ExpectedFrameIDs = []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}
+	FrameIdSlice     = []int32{
+		1, 2, 3, 4, 5, 6, 8,
+		11, 12, 13, 14, 15, 16, 18,
+		21, 22, 23, 24, 25, 26, 28,
+		31, 32, 33, 34, 35, 36, 38,
+		41, 42, 43, 44, 45, 46, 48,
+		51, 52, 53, 54, 55, 56, 58,
+		61, 62, 63, 64, 65, 66, 68,
+		71, 72, 73, 74, 75, 76, 78,
+		81, 82, 83, 84, 85, 86, 88,
+		91, 92, 93, 94, 95, 96, 98,
+		101, 102, 103, 104, 105, 106, 108,
+		111, 112, 113, 114, 115, 116, 118,
+	}
 )
 
 // GenerateRandomNumberString 生成指定长度的数字串