Browse Source

cow: behaviorCurve 牛只行为曲线

Yi 1 week ago
parent
commit
28d22385f1
1 changed files with 56 additions and 99 deletions
  1. 56 99
      module/backend/cow.go

+ 56 - 99
module/backend/cow.go

@@ -6,7 +6,6 @@ import (
 	"fmt"
 	"kpt-pasture/model"
 	"kpt-pasture/util"
-	"log"
 	"net/http"
 	"sort"
 	"strings"
@@ -248,7 +247,7 @@ func (s *StoreEntry) BehaviorCurve(ctx context.Context, req *pasturePb.CowBehavi
 	}
 
 	data := model.NeckActiveHabitSlice(neckActiveHabitList).ToPB(req.CurveName)
-	q1, q3 := s.CowIQR(ctx, cowInfo, req.CurveName, dayRange, data.DateTimeList)
+	q1, q3 := s.CowIQR(cowInfo, req.CurveName, dayRange, data.DateTimeList)
 	data.IQR3 = q3
 	data.IQR1 = q1
 	eventMapList := s.EventTypeMap()
@@ -325,7 +324,7 @@ func (s *StoreEntry) BehaviorCurve(ctx context.Context, req *pasturePb.CowBehavi
 	}, nil
 }
 
-func (s *StoreEntry) CowIQR(ctx context.Context, cowInfo *model.Cow, curveName string, dayRange []string, dateTimeList []string) ([]int32, []int32) {
+func (s *StoreEntry) CowIQR(cowInfo *model.Cow, curveName string, dayRange []string, dateTimeList []string) ([]int32, []int32) {
 	q1, q3 := make([]int32, 0), make([]int32, 0)
 	if curveName == "" || curveName == "active" || curveName == "behavior" || len(dateTimeList) <= 0 {
 		return q1, q3
@@ -346,48 +345,47 @@ func (s *StoreEntry) CowIQR(ctx context.Context, cowInfo *model.Cow, curveName s
 		cowIds = append(cowIds, cow.Id)
 	}
 
-	/*// 行为曲线数据
-	neckActiveHabitList := make([]*model.NeckActiveHabit, 0)
-	if err := s.DB.Table(new(model.NeckActiveHabit).TableName()).
-		Select("rumina,intake,inactive,gasp,high,active,active_time").
-		Where("pasture_id = ?", cowInfo.PastureId).
-		Where("is_show = ?", pasturePb.IsShow_Ok).
-		Where("cow_id IN (?)", cowIds).
-		Where("heat_date BETWEEN ? AND ?", dayRange[0], dayRange[len(dayRange)-1]).
-		Find(&neckActiveHabitList).Error; err != nil {
-		return q1, q3
-	}*/
-
 	neckActiveHabitList, err := s.GetNeckActiveHabitsConcurrent(cowInfo, cowIds, dayRange)
 	if err != nil {
 		return q1, q3
 	}
 
+	neckActiveHabitMap := make(map[string][]*model.NeckActiveHabit)
+	for _, habit := range neckActiveHabitList {
+		activeTime := habit.ActiveTime[:13]
+		neckActiveHabitMap[activeTime] = append(neckActiveHabitMap[activeTime], habit)
+	}
+
 	penOriginal := make(map[string][]int32)
 	for _, dt := range dateTimeList {
-		var info bool
-		for _, habit := range neckActiveHabitList {
-			if !strings.Contains(habit.ActiveTime, dt) {
-				continue
-			}
-			info = true
+		habits, ok := neckActiveHabitMap[dt]
+		if !ok {
+			penOriginal[dt] = append(penOriginal[dt], int32(0))
+			continue
+		}
 
-			switch curveName {
-			case "rumina":
+		switch curveName {
+		case "rumina":
+			for _, habit := range habits {
 				penOriginal[dt] = append(penOriginal[dt], habit.Rumina)
-			case "intake":
-				penOriginal[dt] = append(penOriginal[dt], habit.Intake)
-			case "inactive":
+			}
+		case "intake":
+			for _, habit := range habits {
+				penOriginal[dt] = append(penOriginal[dt], habit.Inactive)
+			}
+		case "inactive":
+			for _, habit := range habits {
 				penOriginal[dt] = append(penOriginal[dt], habit.Inactive)
-			case "chew":
+			}
+		case "chew":
+			for _, habit := range habits {
 				penOriginal[dt] = append(penOriginal[dt], habit.Rumina+habit.Intake)
-			case "immobility":
+			}
+		case "immobility":
+			for _, habit := range habits {
 				penOriginal[dt] = append(penOriginal[dt], 120-habit.Active)
 			}
 		}
-		if !info {
-			penOriginal[dt] = append(penOriginal[dt], int32(0))
-		}
 	}
 
 	if len(penOriginal) > 0 {
@@ -416,83 +414,40 @@ func (s *StoreEntry) CowIQR(ctx context.Context, cowInfo *model.Cow, curveName s
 	return q1, q3
 }
 
-func (s *StoreEntry) GetNeckActiveHabitsConcurrent2(cowInfo *model.Cow, cowIds []int64, dayRange []string) ([]*model.NeckActiveHabit, error) {
-	var neckActiveHabitList []*model.NeckActiveHabit
-	var batchSize = 100
-	var wg sync.WaitGroup
-	var mu sync.Mutex
-
-	// 使用工作池控制并发数
-	workerCount := 5 // 并发数,可根据实际情况调整
-	sem := make(chan struct{}, workerCount)
-
-	for i := 0; i < len(cowIds); i += batchSize {
-		end := i + batchSize
-		if end > len(cowIds) {
-			end = len(cowIds)
-		}
-
-		batch := cowIds[i:end]
-
-		wg.Add(1)
-		go func(batch []int64) {
-			defer wg.Done()
-			sem <- struct{}{} // 获取令牌
-
-			var batchResults []*model.NeckActiveHabit
-			err := s.DB.Table(new(model.NeckActiveHabit).TableName()).
-				Select("rumina,intake,inactive,gasp,high,active,active_time").
-				Where("pasture_id = ?", cowInfo.PastureId).
-				Where("is_show = ?", pasturePb.IsShow_Ok).
-				Where("cow_id IN (?)", batch).
-				Where("heat_date BETWEEN ? AND ?", dayRange[0], dayRange[len(dayRange)-1]).
-				Find(&batchResults).Error
-
-			<-sem // 释放令牌
-
-			if err != nil {
-				// 错误处理逻辑,可根据需要调整
-				log.Printf("batch query error: %v", err)
-				return
-			}
-
-			mu.Lock()
-			neckActiveHabitList = append(neckActiveHabitList, batchResults...)
-			mu.Unlock()
-		}(batch)
-	}
-
-	wg.Wait()
-	return neckActiveHabitList, nil
-}
-
 func (s *StoreEntry) GetNeckActiveHabitsConcurrent(cowInfo *model.Cow, cowIds []int64, dayRange []string) ([]*model.NeckActiveHabit, error) {
 	const (
 		batchSize   = 100
-		workerCount = 5 // 并发数,可根据实际情况调整
+		workerCount = 5
 	)
 
-	// 结果收集通道
-	resultsChan := make(chan []*model.NeckActiveHabit, len(cowIds)/batchSize+1)
-	errChan := make(chan error, 1)
+	resultsChan := make(chan []*model.NeckActiveHabit)
+	errChan := make(chan error, 1) // 缓冲 1,避免 goroutine 阻塞
 	doneChan := make(chan struct{})
 
-	// 启动结果收集goroutine
-	var neckActiveHabitList []*model.NeckActiveHabit
+	var (
+		neckActiveHabitList []*model.NeckActiveHabit
+		mu                  sync.Mutex
+	)
+
+	// 结果收集 goroutine
 	go func() {
 		defer close(doneChan)
 		for batchResults := range resultsChan {
+			mu.Lock()
 			neckActiveHabitList = append(neckActiveHabitList, batchResults...)
+			mu.Unlock()
 		}
 	}()
 
-	// 工作池
 	sem := make(chan struct{}, workerCount)
+	var wg sync.WaitGroup
 
 	// 分发任务
 	go func() {
-		defer close(resultsChan)
-		defer close(errChan)
+		defer func() {
+			wg.Wait()          // 等待所有 goroutine 完成
+			close(resultsChan) // 安全关闭
+		}()
 
 		for i := 0; i < len(cowIds); i += batchSize {
 			end := i + batchSize
@@ -501,10 +456,14 @@ func (s *StoreEntry) GetNeckActiveHabitsConcurrent(cowInfo *model.Cow, cowIds []
 			}
 			batch := cowIds[i:end]
 
-			sem <- struct{}{} // 获取令牌
+			wg.Add(1)
+			sem <- struct{}{} // 获取令牌(可能阻塞,但不会死锁)
 
 			go func(batch []int64) {
-				defer func() { <-sem }() // 释放令牌
+				defer func() {
+					<-sem // 释放令牌
+					wg.Done()
+				}()
 
 				var batchResults []*model.NeckActiveHabit
 				if err := s.DB.Table(new(model.NeckActiveHabit).TableName()).
@@ -516,19 +475,16 @@ func (s *StoreEntry) GetNeckActiveHabitsConcurrent(cowInfo *model.Cow, cowIds []
 					Find(&batchResults).Error; err != nil {
 					select {
 					case errChan <- fmt.Errorf("batch query error: %v", err):
-					default: // 避免阻塞,如果已经有错误了
+					default:
 					}
 					return
 				}
 
-				resultsChan <- batchResults
+				if len(batchResults) > 0 {
+					resultsChan <- batchResults
+				}
 			}(batch)
 		}
-
-		// 等待所有worker完成
-		for i := 0; i < cap(sem); i++ {
-			sem <- struct{}{}
-		}
 	}()
 
 	// 等待结果
@@ -539,6 +495,7 @@ func (s *StoreEntry) GetNeckActiveHabitsConcurrent(cowInfo *model.Cow, cowIds []
 		return neckActiveHabitList, nil
 	}
 }
+
 func (s *StoreEntry) CowGrowthCurve(ctx context.Context, req *pasturePb.CowGrowthCurveRequest) (*pasturePb.CowGrowthCurveResponse, error) {
 	userModel, err := s.GetUserModel(ctx)
 	if err != nil {