Просмотр исходного кода

crontab: neck_ring 脖环数据更新

Yi 2 месяцев назад
Родитель
Сommit
47031f7de9

+ 1 - 1
config/app.develop.yaml

@@ -43,7 +43,7 @@ cron:
   update_same_time: "0 20 1 * * ?"    # 每天凌晨1点20分执行
   system_basic_crontab: "0 25 1 * * ?"  # 每天凌晨1点25分执行
   cow_pregnant: "0 00 15 * * ?"         # 每天15点执行
-  update_active_habit: "0 */2 * * * ?"  # 每2分钟执行一次
+  update_active_habit: "0 */1 * * * ?"  # 每2分钟执行一次
   neck_ring_estrus: "0 */5 * * * ?"     # 每5分钟执行一次
 
 mqtt:

+ 5 - 6
dep/di_crontab.go

@@ -4,6 +4,9 @@ import (
 	"kpt-pasture/config"
 	"kpt-pasture/module/crontab"
 
+	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
+	"go.uber.org/zap"
+
 	"go.uber.org/dig"
 
 	"gitee.com/xuyiping_admin/pkg/cron"
@@ -77,13 +80,9 @@ func EntryCrontab(dependency CrontabDependency) *cron.Crontab {
 		panic(err)
 	}*/
 
-	err = newCrontab.Bind("NeckRingEstrus", cs.NeckRingEstrus, dependency.CrontabHub.EntryCowEstrus)
-	if err != nil {
-		panic(err)
-	}
-
-	err = newCrontab.Bind("UpdateActiveHabit", cs.UpdateActiveHabit, dependency.CrontabHub.EntryUpdateActiveHabit)
+	err = newCrontab.Bind("PastureUpdateCowEstrus", cs.NeckRingEstrus, dependency.CrontabHub.PastureUpdateCowEstrus)
 	if err != nil {
+		zaplog.Error("EntryCrontab", zap.Any("PastureUpdateCowEstrus", err))
 		panic(err)
 	}
 

+ 13 - 0
model/app_mqtt.go

@@ -0,0 +1,13 @@
+package model
+
+type AppMqtt struct {
+	Id            int64  `json:"id"`
+	PastureId     int64  `json:"pastureId"`
+	ReceiveNumber string `json:"receiveNumber"`
+	CreatedAt     int64  `json:"createdAt"`
+	UpdatedAt     int64  `json:"updatedAt"`
+}
+
+func (a *AppMqtt) TableName() string {
+	return "app_mqtt"
+}

+ 34 - 0
model/app_pasture_list.go

@@ -0,0 +1,34 @@
+package model
+
+import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
+
+type AppPastureList struct {
+	Id                   int64                 `json:"id"`
+	FarmId               string                `json:"farmId"`
+	Name                 string                `json:"name"`
+	ShortName            string                `json:"shortName"`
+	GroupId              int64                 `json:"groupId"`
+	Province             string                `json:"province"`
+	City                 string                `json:"city"`
+	County               string                `json:"county"`
+	Address              string                `json:"address"`
+	LegalPersonName      string                `json:"legalPersonName"`
+	LegalPersonPhone     string                `json:"legalPersonPhone"`
+	FactoryDirectorName  string                `json:"factoryDirectorName"`
+	FactoryDirectorPhone string                `json:"factoryDirectorPhone"`
+	Category             int32                 `json:"category"`
+	CurrentScale         string                `json:"currentScale"`
+	PlanScale            string                `json:"planScale"`
+	AppId                string                `json:"appId"`
+	Status               int32                 `json:"status"`
+	IsShow               pasturePb.IsShow_Kind `json:"isShow"`
+	ProductionModel      int32                 `json:"productionModel"`
+	Remarks              string                `json:"remarks"`
+	CreatedName          string                `json:"createdName"`
+	CreatedAt            int64                 `json:"createdAt"`
+	UpdatedAt            string                `json:"updatedAt"`
+}
+
+func (a *AppPastureList) TableName() string {
+	return "app_pasture_list"
+}

+ 13 - 2
model/cow.go

@@ -23,7 +23,7 @@ type Cow struct {
 	CalvingAge          int64                          `json:"calvingAge"`          // 产后天使
 	PregnancyAge        int32                          `json:"pregnancyAge"`        // 怀孕天数 孕检结果有阳性更新,产犊后至0
 	AdmissionAge        int32                          `json:"admissionAge"`        // 入场日龄
-	AbortionAge         int64                          `json:"abortionAge"`         // 流产天数
+	AbortionAge         int32                          `json:"abortionAge"`         // 流产天数
 	CowType             pasturePb.CowType_Kind         `json:"cowType"`             // 牛只类型
 	BreedStatus         pasturePb.BreedStatus_Kind     `json:"breedStatus"`         // 繁殖状态
 	CowKind             pasturePb.CowKind_Kind         `json:"cowKind"`             // 牛只品种
@@ -61,6 +61,17 @@ func (c *Cow) TableName() string {
 	return "cow"
 }
 
+func (c *Cow) EventInfoUpdate() {
+	c.DayAge = c.GetDayAge()
+	c.CalvingAge = c.GetCalvingAge()
+	c.PregnancyAge = c.GetDaysPregnant()
+	c.AdmissionAge = c.GetAdmissionAge()
+	c.AbortionAge = c.GetAbortionAge()
+	if c.DayAge == 60 {
+		c.CowType = pasturePb.CowType_Weaned_Calf
+	}
+}
+
 // EventCalvingUpdate 产犊更新
 func (c *Cow) EventCalvingUpdate(calvingAt int64) {
 	c.Lact += 1
@@ -321,7 +332,7 @@ func NewCow(req *pasturePb.EventEnterRequest) *Cow {
 		FirstMatingAt:       int64(req.MatingAt),
 		LastMatingAt:        int64(req.MatingAt),
 		LastPregnantCheckAt: int64(req.PregnancyCheckAt),
-		AdmissionAt:         time.Now().Unix(),
+		AdmissionAt:         int64(req.BirthAt),
 	}
 }
 

+ 2 - 2
module/mqtt/model.go → model/mqtt.go

@@ -1,4 +1,4 @@
-package mqtt
+package model
 
 type NewBehavior struct {
 	SoftVer  int32   `json:"soft_ver"`
@@ -40,6 +40,6 @@ type Behavior struct {
 type NeckRingWrapper struct {
 	Type     string `json:"type"`
 	NeckRing struct {
-		NeckPck []Behavior `json:"neck"` // neck_pck neck
+		NeckPck []*Behavior `json:"neck"` // neck_pck neck
 	} `json:"NeckRing"`
 }

+ 26 - 22
model/neck_active_habit.go

@@ -7,16 +7,15 @@ import (
 )
 
 const (
-	DefaultChangeFilter  = -10000
-	DefaultRuminaFilter  = -10000
-	DefaultChewFilter    = -10000
+	InitChangeFilter     = -10000
+	DefaultChangeFilter  = -99
 	DefaultFilterCorrect = 100
-	DefaultWeeklyActive  = 105
+	DefaultWeeklyActive  = 1500
 )
 
 type NeckActiveHabit struct {
 	Id                      int64                 `json:"id"`
-	CowId                   int64                 `json:"cowId"`
+	PastureId               int64                 `json:"pastureId"`
 	NeckRingNumber          string                `json:"neckRingNumber"`
 	ActiveTime              string                `json:"activeTime"`
 	Frameid                 int32                 `json:"frameid"`
@@ -58,7 +57,6 @@ type NeckActiveHabit struct {
 	Score                   int32                 `json:"score"`
 	IsMaxTime               pasturePb.IsShow_Kind `json:"isMaxTime"`
 	IsShow                  pasturePb.IsShow_Kind `json:"isShow"`
-	ReceiveNumber           int32                 `json:"receiveNumber"`
 	RecordCount             int32                 `json:"recordCount"`
 	CreatedAt               int64                 `json:"createdAt"`
 	UpdatedAt               int64                 `json:"updatedAt"`
@@ -68,11 +66,12 @@ func (n *NeckActiveHabit) TableName() string {
 	return "neck_active_habit"
 }
 
-func NewNeckActiveHabit(frameId int32, heatDate, neckRingNumber string, data *NeckRingOriginalMerge) *NeckActiveHabit {
+func NewNeckActiveHabit(data *NeckRingOriginalMerge) *NeckActiveHabit {
 	return &NeckActiveHabit{
-		Frameid:        frameId,
-		HeatDate:       heatDate,
-		NeckRingNumber: neckRingNumber,
+		PastureId:      data.PastureId,
+		Frameid:        data.XframeId,
+		HeatDate:       data.ActiveDate,
+		NeckRingNumber: data.NeckRingNumber,
 		Active:         data.Active,
 		Gasp:           data.Gasp,
 		High:           data.High,
@@ -83,20 +82,25 @@ func NewNeckActiveHabit(frameId int32, heatDate, neckRingNumber string, data *Ne
 		WeekHigh:       DefaultWeeklyActive,
 		IsShow:         pasturePb.IsShow_No,
 		IsMaxTime:      pasturePb.IsShow_No,
-		ChangeFilter:   DefaultChangeFilter,
+		ChangeFilter:   InitChangeFilter,
 		FilterCorrect:  DefaultFilterCorrect,
-		RuminaFilter:   DefaultRuminaFilter,
-		ChewFilter:     DefaultChewFilter,
-		ActiveTime:     fmt.Sprintf("%s %02d:00:00", heatDate, frameId),
+		RuminaFilter:   InitChangeFilter,
+		ChewFilter:     InitChangeFilter,
+		ActiveTime:     fmt.Sprintf("%s %02d:00:00", data.ActiveDate, data.XframeId*2),
+		RecordCount:    data.RecordCount,
 	}
 }
 
-func (n *NeckActiveHabit) MergeData(data *NeckActiveHabit) {
-	n.Rumina += data.Rumina
-	n.Inactive += data.Inactive
-	n.Active += data.Active
-	n.Intake += data.Intake
-	n.Other += data.Other
-	n.Gasp += data.Gasp
-	n.High += data.High
+func (n *NeckActiveHabit) SumAvg() {
+	n.Rumina = n.Rumina / n.RecordCount * n.RecordCount
+	n.Inactive = n.Inactive / n.RecordCount * n.RecordCount
+	n.Active = n.Active / n.RecordCount * n.RecordCount
+	n.Intake = n.Intake / n.RecordCount * n.RecordCount
+	n.Other = n.Other / n.RecordCount * n.RecordCount
+	n.Gasp = n.Gasp / n.RecordCount * n.RecordCount
+	n.High = n.High / n.RecordCount * n.RecordCount
+}
+
+type MaxHabitIdModel struct {
+	Id int64 `json:"id"`
 }

+ 20 - 20
model/neck_ring.go

@@ -8,17 +8,17 @@ import (
 )
 
 type NeckRing struct {
-	Id            int64                         `json:"id"`
-	PastureId     int64                         `json:"pastureId"`
-	Number        string                        `json:"number"`
-	CowId         int64                         `json:"cowId"`
-	WearAt        int64                         `json:"wearAt"`
-	Status        pasturePb.NeckRingStatus_Kind `json:"status"`
-	ErrorReason   string                        `json:"errorReason"`
-	OperationId   int32                         `json:"operationId"`
-	OperationName string                        `json:"operationName"`
-	CreatedAt     int64                         `json:"createdAt"`
-	UpdatedAt     int64                         `json:"updatedAt"`
+	Id             int64                         `json:"id"`
+	PastureId      int64                         `json:"pastureId"`
+	NeckRingNumber string                        `json:"neckRingNumber"`
+	CowId          int64                         `json:"cowId"`
+	WearAt         int64                         `json:"wearAt"`
+	Status         pasturePb.NeckRingStatus_Kind `json:"status"`
+	ErrorReason    string                        `json:"errorReason"`
+	OperationId    int32                         `json:"operationId"`
+	OperationName  string                        `json:"operationName"`
+	CreatedAt      int64                         `json:"createdAt"`
+	UpdatedAt      int64                         `json:"updatedAt"`
 }
 
 func (n *NeckRing) TableName() string {
@@ -30,15 +30,15 @@ func (n *NeckRing) EventBindUpdate(cowId int64) {
 	n.WearAt = time.Now().Unix()
 }
 
-func NewNeckRing(pastureId int64, number string, cowId int64, operationUser *SystemUser) *NeckRing {
+func NewNeckRing(pastureId int64, neckRingNumber string, cowId int64, operationUser *SystemUser) *NeckRing {
 	return &NeckRing{
-		PastureId:     pastureId,
-		Number:        number,
-		CowId:         cowId,
-		WearAt:        time.Now().Unix(),
-		Status:        pasturePb.NeckRingStatus_Bind,
-		OperationId:   int32(operationUser.Id),
-		OperationName: operationUser.Name,
+		PastureId:      pastureId,
+		NeckRingNumber: neckRingNumber,
+		CowId:          cowId,
+		WearAt:         time.Now().Unix(),
+		Status:         pasturePb.NeckRingStatus_Bind,
+		OperationId:    int32(operationUser.Id),
+		OperationName:  operationUser.Name,
 	}
 }
 
@@ -69,7 +69,7 @@ func (n NeckRingSlice) ToPB(
 		}
 		res[i] = &pasturePb.SearchNeckRingList{
 			Id:           int32(v.Id),
-			Number:       v.Number,
+			Number:       v.NeckRingNumber,
 			PenId:        penId,
 			PenName:      penName,
 			CowId:        int32(v.CowId),

+ 17 - 17
model/neck_ring_bind_log.go

@@ -3,30 +3,30 @@ package model
 import "time"
 
 type NeckRingBindLog struct {
-	Id            int64  `json:"id"`
-	PastureId     int64  `json:"pastureId"`
-	Number        string `json:"number"`
-	CowId         int64  `json:"cowId"`
-	BindAt        int64  `json:"bindAt"`
-	UnBindAt      int64  `json:"unBindAt"`
-	OperationId   int64  `json:"operationId"`
-	OperationName string `json:"operationName"`
-	CreatedAt     int64  `json:"createdAt"`
-	UpdatedAt     int64  `json:"updatedAt"`
+	Id             int64  `json:"id"`
+	PastureId      int64  `json:"pastureId"`
+	NeckRingNumber string `json:"neckRingNumber"`
+	CowId          int64  `json:"cowId"`
+	BindAt         int64  `json:"bindAt"`
+	UnBindAt       int64  `json:"unBindAt"`
+	OperationId    int64  `json:"operationId"`
+	OperationName  string `json:"operationName"`
+	CreatedAt      int64  `json:"createdAt"`
+	UpdatedAt      int64  `json:"updatedAt"`
 }
 
 func (n *NeckRingBindLog) TableName() string {
 	return "neck_ring_bind_log"
 }
 
-func NewNeckRingBindLog(number string, cowId int64, operationUser *SystemUser) *NeckRingBindLog {
+func NewNeckRingBindLog(neckRingNumber string, cowId int64, operationUser *SystemUser) *NeckRingBindLog {
 	return &NeckRingBindLog{
-		PastureId:     operationUser.PastureId,
-		Number:        number,
-		BindAt:        time.Now().Unix(),
-		CowId:         cowId,
-		OperationId:   operationUser.Id,
-		OperationName: operationUser.Name,
+		PastureId:      operationUser.PastureId,
+		NeckRingNumber: neckRingNumber,
+		BindAt:         time.Now().Unix(),
+		CowId:          cowId,
+		OperationId:    operationUser.Id,
+		OperationName:  operationUser.Name,
 	}
 }
 

+ 72 - 36
model/neck_ring_original.go

@@ -1,8 +1,7 @@
 package model
 
 import (
-	"math"
-	"strconv"
+	"fmt"
 	"strings"
 
 	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
@@ -41,24 +40,45 @@ func (n *NeckRingOriginal) TableName() string {
 	return "neck_ring_original"
 }
 
-var (
-	AvgHours = int32(6)
-	JoinKey  = "/"
-)
+func NewNeckRingOriginal(neckLog *Behavior, pastureMap map[string]int64) *NeckRingOriginal {
+	activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
+	pastureId := int64(0)
+	if neckLog.Frameid%10 == 8 {
+		activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
+	}
 
-func (n *NeckRingOriginal) IsAvgHours() {
-	if n.ActiveDateType == pasturePb.ActiveTimeType_Two_Hours {
-		n.Remain *= AvgHours
-		n.Inactive *= AvgHours
-		n.Active *= AvgHours
-		n.Intake *= AvgHours
-		n.Other *= AvgHours
-		n.Gasp *= AvgHours
-		n.High *= AvgHours
-		n.High = int32(math.Min(1800, float64(n.Other)))
+	if pasture, ok := pastureMap[neckLog.Imei]; ok {
+		pastureId = pasture
+	}
+	return &NeckRingOriginal{
+		Uuid:            neckLog.UUID,
+		PastureId:       pastureId,
+		NeckRingNumber:  fmt.Sprintf("%d", neckLog.Ecowid),
+		Frameid:         neckLog.Frameid,
+		Rumina:          neckLog.Rumina,
+		Intake:          neckLog.Intake,
+		Inactive:        neckLog.Inactive,
+		Gasp:            neckLog.Other,
+		High:            neckLog.Activitys,
+		Active:          neckLog.High,
+		FirmwareVersion: neckLog.Sver,
+		HardwareVersion: neckLog.Hver,
+		Remain:          neckLog.Remain,
+		Voltage:         neckLog.BAT,
+		RestartReason:   neckLog.STATUS,
+		Upper:           neckLog.UpPer,
+		Imei:            neckLog.Imei,
+		ReceiveNumber:   neckLog.Imei,
+		ActiveDateType:  activeDateTimeType,
+		IsShow:          pasturePb.IsShow_No,
 	}
 }
 
+const (
+	AvgHours = int32(6)
+	JoinKey  = "/"
+)
+
 type NeckRingOriginalMerge struct {
 	Rumina         int32
 	Inactive       int32
@@ -67,27 +87,47 @@ type NeckRingOriginalMerge struct {
 	Other          int32
 	High           int32
 	Gasp           int32
+	ActiveDate     string
+	NeckRingNumber string
+	XframeId       int32
 	ActiveDateType pasturePb.ActiveTimeType_Kind
+	RecordCount    int32
+	PastureId      int64
 }
 
-func (n *NeckRingOriginalMerge) IsMageData(data *NeckRingOriginal) {
-	n.Rumina += data.Rumina
-	n.Inactive += data.Inactive
-	n.Active += data.Active
-	n.Intake += data.Intake
-	n.Other += data.Other
-	n.Gasp += data.Gasp
-	n.High += data.High
+func (n *NeckRingOriginalMerge) IsMageData(data *NeckRingOriginal, xframeId int32) {
+	avgParam := int32(1)
+	if n.ActiveDateType == pasturePb.ActiveTimeType_Two_Hours {
+		n.RecordCount = AvgHours
+		avgParam = 6
+	} else {
+		n.RecordCount += 1
+	}
+	high := data.High * avgParam
+	if high > 8800 {
+		high = 8800
+	}
+	n.Rumina += data.Rumina * avgParam
+	n.Inactive += data.Inactive * avgParam
+	n.Active += data.Active * avgParam
+	n.Intake += data.Intake * avgParam
+	n.Other += data.Other * avgParam
+	n.Gasp += data.Gasp * avgParam
+	n.High += high
+	n.ActiveDate = data.ActiveDate
+	n.NeckRingNumber = data.NeckRingNumber
+	n.XframeId = xframeId
+	n.PastureId = data.PastureId
 }
 
 func (n *NeckRingOriginalMerge) SumAvg() {
-	n.Rumina = n.Rumina / AvgHours * AvgHours
-	n.Inactive = n.Inactive / AvgHours * AvgHours
-	n.Active = n.Active / AvgHours * AvgHours
-	n.Intake = n.Intake / AvgHours * AvgHours
-	n.Other = n.Other / AvgHours * AvgHours
-	n.Gasp = n.Gasp / AvgHours * AvgHours
-	n.High = n.High / AvgHours * AvgHours
+	n.Rumina = int32(float32(n.Rumina) / float32(n.RecordCount) * float32(n.RecordCount))
+	n.Inactive = int32(float32(n.Inactive) / float32(n.RecordCount) * float32(n.RecordCount))
+	n.Active = int32(float32(n.Active) / float32(n.RecordCount) * float32(n.RecordCount))
+	n.Intake = int32(float32(n.Intake) / float32(n.RecordCount) * float32(n.RecordCount))
+	n.Other = int32(float32(n.Other) / float32(n.RecordCount) * float32(n.RecordCount))
+	n.Gasp = int32(float32(n.Gasp) / float32(n.RecordCount) * float32(n.RecordCount))
+	n.High = int32(float32(n.High) / float32(n.RecordCount) * float32(n.RecordCount))
 }
 
 type NeckRingOriginalMap map[string]*NeckRingOriginalMerge
@@ -99,11 +139,7 @@ func (n NeckRingOriginalMap) ForMatData() []*NeckActiveHabit {
 		if len(keyStrList) != 3 {
 			continue
 		}
-		neckRingNumber := keyStrList[0]
-		heatDate := keyStrList[1]
-		frameId := keyStrList[2]
-		frameIdInt, _ := strconv.Atoi(frameId)
-		res = append(res, NewNeckActiveHabit(int32(frameIdInt), heatDate, neckRingNumber, v))
+		res = append(res, NewNeckActiveHabit(v))
 	}
 	return res
 }

+ 13 - 12
model/system_configure.go

@@ -3,23 +3,24 @@ package model
 import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 
 const (
-	ActiveLowest        = "active_lowest"
-	RuminaLowest        = "rumina_lowest"
-	WeeklyActive        = "weekly_active"
-	XRuminaDisc         = "x_rumina_disc"
-	XChangeDiscount     = "x_change_discount"
-	ActiveLow           = "active_low"
-	ActiveMiddle        = "active_middle"
-	ActiveHigh          = "active_high"
-	MaxEstrus           = "max_estrus"
-	MaxHabit            = "max_habit"
-	UpdateOriginalMaxId = "update_original_max_id"
+	ActiveLowest      = "active_lowest"
+	RuminaLowest      = "rumina_lowest"
+	WeeklyActive      = "weekly_active"
+	XRuminaDisc       = "x_rumina_disc"
+	XChangeDiscount   = "x_change_discount"
+	ActiveLow         = "active_low"
+	ActiveMiddle      = "active_middle"
+	ActiveHigh        = "active_high"
+	MaxEstrus         = "max_estrus"
+	MaxHabit          = "max_habit"
+	UpdateOriginalMax = "original_merge_data"
 )
 
 type SystemConfigure struct {
 	Id        int64                 `json:"id"`
+	PastureId int64                 `json:"pastureId"`
 	Name      string                `json:"name"`
-	Value     int32                 `json:"value"`
+	Value     int64                 `json:"value"`
 	Remarks   string                `json:"remarks"`
 	IsShow    pasturePb.IsShow_Kind `json:"is_show"`
 	CreatedAt int64                 `json:"createdAt"`

+ 3 - 3
module/backend/goods.go

@@ -177,7 +177,7 @@ func (s *StoreEntry) NeckRingCreateOrUpdate(ctx context.Context, req *pasturePb.
 				// 解绑
 			case pasturePb.NeckRingOperationStatus_UnBind:
 				if err = tx.Model(new(model.NeckRing)).
-					Where("number = ?", v.Number).
+					Where("neck_ring_number = ?", v.Number).
 					Updates(map[string]interface{}{
 						"wear_at": 0,
 						"cow_id":  0,
@@ -195,7 +195,7 @@ func (s *StoreEntry) NeckRingCreateOrUpdate(ctx context.Context, req *pasturePb.
 				}
 
 				if err = tx.Model(new(model.NeckRingBindLog)).
-					Where("number = ?", v.Number).
+					Where("neck_ring_number = ?", v.Number).
 					Update("un_bind_at", time.Now().Unix()).Error; err != nil {
 					return xerr.WithStack(err)
 				}
@@ -203,7 +203,7 @@ func (s *StoreEntry) NeckRingCreateOrUpdate(ctx context.Context, req *pasturePb.
 			case pasturePb.NeckRingOperationStatus_Edit:
 				if err = tx.Model(new(model.NeckRing)).
 					Where("cow_id = ?", v.CowId).
-					Update("number", v.Number).Error; err != nil {
+					Update("neck_ring_number", v.Number).Error; err != nil {
 					return xerr.WithStack(err)
 				}
 

+ 4 - 15
module/crontab/cow_cron.go

@@ -71,21 +71,11 @@ func (e *Entry) UpdateCowInfo() error {
 		e.CreateCrontabLog(UpdateCowInfo)
 	}()
 	for _, cow := range cowList {
-		updates := map[string]interface{}{
-			"day_age":       cow.GetDayAge(),
-			"calving_at":    cow.GetCalvingAge(),
-			"pregnancy_age": cow.GetDaysPregnant(),
-			"admission_age": cow.GetAdmissionAge(),
-			"abortion_age":  cow.GetAbortionAge(),
-		}
-
-		if cow.GetDayAge() == 60 {
-			updates["cow_type"] = pasturePb.CowType_Weaned_Calf
-		}
-
+		cow.EventInfoUpdate()
 		if err := e.DB.Model(new(model.Cow)).
+			Select("day_age", "calving_age", "pregnancy_age", "admission_age", "abortion_age", "cow_type").
 			Where("id = ?", cow.Id).
-			Updates(updates).Error; err != nil {
+			Updates(cow).Error; err != nil {
 			zaplog.Error("Crontab", zap.Any("UpdateCowDayAge", err))
 		}
 	}
@@ -113,7 +103,7 @@ func (e *Entry) ImmunizationPlan() error {
 			Select("a.*").
 			Where("a.pasture_id = ?", plan.PastureId).
 			Where("a.admission_status = ?", pasturePb.AdmissionStatus_Admission).
-			Where("NOT EXIST ( select 1 from event_immunization_plan b where b.pen_id = a.id and b.status = ? and b.plan_day > ?)", plan.Id, pasturePb.IsShow_No, nowTime)
+			Where("NOT EXISTS ( select 1 from event_immunization_plan b where b.pen_id = a.id and b.status = ? and b.plan_day > ?)", plan.Id, pasturePb.IsShow_No, nowTime)
 		if plan.CowType > 0 {
 			pref.Where("a.cow_type = ?", plan.CowType)
 		}
@@ -188,7 +178,6 @@ func (e *Entry) SameTimePlan() error {
 	if ok := e.IsExistCrontabLog(SameTimePlan); ok {
 		return nil
 	}
-
 	sameTimeList := make([]*model.SameTime, 0)
 	if err := e.DB.Model(new(model.SameTime)).
 		Where("is_show = ?", pasturePb.IsShow_Ok).

+ 1 - 2
module/crontab/interface.go

@@ -32,6 +32,5 @@ type Crontab interface {
 	UpdateSameTime() error
 	SystemBasicCrontab() error
 
-	EntryUpdateActiveHabit() error // 更新脖环数据 2分钟执行一下
-	EntryCowEstrus() error         // 获取牛只发情数据 5分钟执行一下
+	PastureUpdateCowEstrus() error // 获取牛只发情数据 5分钟执行一下
 }

+ 22 - 22
module/crontab/model.go

@@ -15,7 +15,7 @@ type XToday struct {
 }
 
 type AvgHabit struct {
-	CowId            int64
+	NeckRingNumber   string
 	AvgHighHabit     int32
 	AvgRuminaHabit   int32
 	AvgChewHabit     int32
@@ -25,33 +25,33 @@ type AvgHabit struct {
 }
 
 type SumHabit struct {
-	CowId       int64
-	SumRumina   int32
-	SumIntake   int32
-	SumInactive int32
-	SumActive   int32
-	SumMaxHigh  int32
-	SumMinHigh  int32
-	SumMinChew  int32
+	NeckRingNumber string
+	SumRumina      int32
+	SumIntake      int32
+	SumInactive    int32
+	SumActive      int32
+	SumMaxHigh     int32
+	SumMinHigh     int32
+	SumMinChew     int32
 }
 
 type ChangeFilterData struct {
-	Id           int64
-	CowId        int64
-	HighChange   int32
-	ChangeFilter int32
-	RuminaFilter int32
-	ChangeRumina int32
-	ChewFilter   int32
-	ChangeChew   int32
-	XlcDisCount  float64
+	Id             int64
+	NeckRingNumber string
+	HighChange     int32
+	ChangeFilter   int32
+	RuminaFilter   int32
+	ChangeRumina   int32
+	ChewFilter     int32
+	ChangeChew     int32
+	XlcDisCount    float64
 }
 
 type ActivityVolume struct {
-	CowId     int64
-	AvgFilter int32
-	StdFilter int32
-	Nb        int32
+	NeckRingNumber string
+	AvgFilter      int32
+	StdFilter      int32
+	Nb             int32
 }
 
 type CowEstrusOriginal struct {

+ 53 - 21
module/crontab/neck_ring_estrus.go

@@ -22,19 +22,51 @@ const (
 	NormalChangJust = 10
 )
 
-func (e *Entry) EntryCowEstrus() (err error) {
-	activeLowValue := e.GetSystemConfigure(model.ActiveLow).Value
-	activeMiddleValue := e.GetSystemConfigure(model.ActiveMiddle).Value
-	activeHighValue := e.GetSystemConfigure(model.ActiveHigh).Value
-	lastMaxEstrusId := e.GetSystemConfigure(model.MaxEstrus).Value
+func (e *Entry) PastureUpdateCowEstrus() (err error) {
+	pastureList := e.FindPastureList()
+	if pastureList == nil || len(pastureList) == 0 {
+		return nil
+	}
+
+	for _, pasture := range pastureList {
+		if err = e.EntryCowEstrus(pasture.Id); err != nil {
+			zaplog.Error("EntryCrontab", zap.Any("PastureUpdateCowEstrus", err), zap.Any("pasture", pasture))
+		}
+		zaplog.Error("PastureUpdateCowEstrus-success", zap.Any("pasture", pasture.Id))
+	}
+	return nil
+}
 
+func (e *Entry) EntryCowEstrus(pastureId int64) (err error) {
+	activeLow, err := e.GetSystemConfigure(pastureId, model.ActiveLow)
+	if err != nil {
+		return xerr.WithStack(err)
+	}
+	activeLowValue := int64(activeLow.Value)
+	activeMiddle, err := e.GetSystemConfigure(pastureId, model.ActiveMiddle)
+	if err != nil {
+		return xerr.WithStack(err)
+	}
+	activeMiddleValue := int64(activeMiddle.Value)
+	activeHigh, err := e.GetSystemConfigure(pastureId, model.ActiveHigh)
+	if err != nil {
+		return xerr.WithStack(err)
+	}
+	activeHighValue := int64(activeHigh.Value)
+	lastMaxEstrus, err := e.GetSystemConfigure(pastureId, model.MaxEstrus)
+	if err != nil {
+		return xerr.WithStack(err)
+	}
+	lastMaxEstrusId := int64(lastMaxEstrus.Value)
+	zaplog.Info("EntryCowEstrus-001", zap.Any("lastMaxEstrusId", lastMaxEstrusId))
 	currentMaxHabit := &model.NeckActiveHabit{}
 	if err = e.DB.Model(new(model.NeckActiveHabit)).
+		Where("id > ?", lastMaxEstrusId).
 		Order("id desc").
 		First(currentMaxHabit).Error; err != nil {
 		return xerr.WithStack(err)
 	}
-
+	zaplog.Info("EntryCowEstrus-002", zap.Any("currentMaxHabit", currentMaxHabit))
 	defer func() {
 		if err == nil {
 			e.DB.Model(new(model.SystemConfigure)).
@@ -45,12 +77,12 @@ func (e *Entry) EntryCowEstrus() (err error) {
 
 	xToday := &XToday{}
 	if err = e.DB.Model(new(model.NeckActiveHabit)).
-		Select(`MIN(h.heat_date) as x_beg_date, MAX(h.heat_date) as x_end_date`).
+		Select(`MIN(heat_date) as x_beg_date, MAX(heat_date) as x_end_date`).
 		Where("id BETWEEN ? AND ?", lastMaxEstrusId, currentMaxHabit.Id).
 		First(xToday).Error; err != nil {
 		return xerr.WithStack(err)
 	}
-
+	zaplog.Info("EntryCowEstrus-003", zap.Any("xToday", xToday))
 	// 当前Id<=上次执行的id,则不执行
 	if currentMaxHabit.Id <= int64(lastMaxEstrusId) {
 		return nil
@@ -61,11 +93,11 @@ func (e *Entry) EntryCowEstrus() (err error) {
 	xToday.ActiveLow = int64(activeLowValue)
 	xToday.ActiveMiddle = int64(activeMiddleValue)
 	xToday.ActiveHigh = int64(activeHighValue)
-
+	zaplog.Info("EntryCowEstrus-005", zap.Any("xToday", xToday))
 	if err = e.CowEstrusWarning(xToday); err != nil {
 		return xerr.WithStack(err)
 	}
-
+	zaplog.Info("EntryCowEstrus-006", zap.Any("xToday", xToday))
 	return nil
 }
 
@@ -115,16 +147,21 @@ func (e *Entry) CowEstrusWarning(xToday *XToday) error {
 				}
 				cft = float32(v.ChangeFilter)*float32(v.FilterCorrect)/100 - value
 			}
+			cowInfo := e.FindCowInfoByNeckRingNumber(v.NeckRingNumber)
+			if cowInfo == nil {
+				zaplog.Error("CowEstrusWarning", zap.Any("FindCowInfoByNeckRingNumber", err), zap.Any("NeckRingNumber", v.NeckRingNumber))
+				continue
+			}
 			// 最近3天最大发情记录,小于该变化趋势的不再插入
-			eventEstrus := e.GetBeforeThreeDaysCowEstrus(v.CowId, startDate.AddDate(0, 0, -2).Format(model.LayoutTime))
-			if eventEstrus.CowId != v.CowId {
+			eventEstrus := e.GetBeforeThreeDaysCowEstrus(cowInfo.Id, startDate.AddDate(0, 0, -2).Format(model.LayoutTime))
+			if eventEstrus.CowId != cowInfo.Id {
 				if int32(cft) <= eventEstrus.PerTwentyFourHigh {
 					continue
 				}
 			}
 			// 判断最近50天内是否存在发情记录(发情等级>=2),如果18~25天@xadjust21,如果36~50天@xadjust42
-			cowEstrus := e.GetTwoEstrus(v.CowId, startDate.AddDate(0, 0, -100).Format(model.LayoutTime), startDate.AddDate(0, 0, -2).Format(model.LayoutTime))
-			if cowEstrus.CowId == v.CowId {
+			cowEstrus := e.GetTwoEstrus(cowInfo.Id, startDate.AddDate(0, 0, -100).Format(model.LayoutTime), startDate.AddDate(0, 0, -2).Format(model.LayoutTime))
+			if cowEstrus.CowId == cowInfo.Id {
 				activeDateTime, _ := time.Parse(model.LayoutTime, cowEstrus.ActiveDate)
 				if activeDateTime.Unix() >= startDate.AddDate(0, 0, -25).Unix() && activeDateTime.Unix() <= startDate.AddDate(0, 0, -18).Unix() {
 					cowEstrus.HadJust = XAdjust21
@@ -133,13 +170,8 @@ func (e *Entry) CowEstrusWarning(xToday *XToday) error {
 					cowEstrus.HadJust = XAdjust42
 				}
 			}
-			if int32(cft)+cowEstrus.HadJust <= int32(xToday.ActiveLow) {
-				continue
-			}
 
-			cowInfo, err := e.GetCowById(v.CowId)
-			if err != nil {
-				zaplog.Error("CowEstrusWarning", zap.Any("GetCowById", err), zap.Any("cowId", v.CowId))
+			if int32(cft)+cowEstrus.HadJust <= int32(xToday.ActiveLow) {
 				continue
 			}
 
@@ -166,7 +198,7 @@ func (e *Entry) CowEstrusWarning(xToday *XToday) error {
 			}
 
 			eventEstrusList = append(eventEstrusList, &model.EventEstrus{
-				CowId:             v.CowId,
+				CowId:             cowInfo.Id,
 				Lact:              cowInfo.Lact,
 				ExposeEstrusType:  pasturePb.ExposeEstrusType_Natural_Estrus,
 				FilterHigh:        v.FilterHigh,

+ 27 - 23
module/crontab/sql.go

@@ -1,15 +1,27 @@
 package crontab
 
 import (
-	"errors"
 	"kpt-pasture/model"
 
-	"gorm.io/gorm"
+	"go.uber.org/zap"
+
+	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 
 	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 	"gitee.com/xuyiping_admin/pkg/xerr"
 )
 
+func (e *Entry) FindPastureList() []*model.AppPastureList {
+	res := make([]*model.AppPastureList, 0)
+	if err := e.DB.Model(new(model.AppPastureList)).
+		Where("is_show = ?", pasturePb.IsShow_Ok).
+		Find(&res).Error; err != nil {
+		zaplog.Error("FindPastureList error", zap.Any("err", err))
+		return res
+	}
+	return res
+}
+
 func (e *Entry) GetCowById(cowId int64) (*model.Cow, error) {
 	cowInfo := &model.Cow{}
 	if err := e.DB.Model(new(model.Cow)).
@@ -33,33 +45,16 @@ func (e *Entry) GetPenMapList() (map[int32]*model.Pen, error) {
 	return penMap, nil
 }
 
-func (e *Entry) GetSystemConfigure(name string) *model.SystemConfigure {
+func (e *Entry) GetSystemConfigure(pastureId int64, name string) (*model.SystemConfigure, error) {
 	res := &model.SystemConfigure{}
 	if err := e.DB.Model(new(model.SystemConfigure)).
 		Where("name = ?", name).
+		Where("pasture_id = ?", pastureId).
 		Where("is_show = ?", pasturePb.IsShow_Ok).
 		First(res).Error; err != nil {
-		return nil
-	}
-	return res
-}
-
-func (e *Entry) GetMinIdByHeatDate(heatDate string, defaultId int64) (int64, error) {
-	xMinId := struct {
-		Id int64
-	}{}
-	if err := e.DB.Model(new(model.NeckActiveHabit)).
-		Select("MIN(id) as id").
-		//Where("heat_date = ?", minHeatDateParse.AddDate(0, 0, -1).Format(model.LayoutDate2)).
-		Where("heat_date >= ?", heatDate).
-		First(&xMinId).Error; err != nil {
-		if errors.Is(err, gorm.ErrRecordNotFound) {
-			xMinId.Id = defaultId
-		} else {
-			return 0, xerr.WithStack(err)
-		}
+		return nil, xerr.WithStack(err)
 	}
-	return xMinId.Id, nil
+	return res, nil
 }
 
 // GetBeforeThreeDaysCowEstrus 获取值得时间之前三天内最大发情记录
@@ -89,3 +84,12 @@ func (e *Entry) GetTwoEstrus(cowId int64, startActiveTime, endActiveTime string)
 	}
 	return newCowEstrus
 }
+
+func (e *Entry) FindCowInfoByNeckRingNumber(neckRingNumber string) *model.Cow {
+	res := &model.Cow{}
+	if err := e.DB.Model(new(model.Cow)).
+		Where("neck_ring_number = ?", neckRingNumber).First(res).Error; err != nil {
+		return nil
+	}
+	return res
+}

+ 3 - 3
module/crontab/work_cron_test.go

@@ -49,13 +49,13 @@ func TestEntry_SameTimePlan(t *testing.T) {
 			continue
 		}
 		if v.FilterHigh <= 0 {
-			v.FilterHigh = int32(computeIfPositiveElse(float64(v.High), float64(prev.FilterHigh), 0.23, 0.77))
+			v.FilterHigh = int32(mqtt.computeIfPositiveElse(float64(v.High), float64(prev.FilterHigh), 0.23, 0.77))
 		}
 		if v.FilterRumina <= 0 {
-			v.FilterRumina = int32(computeIfPositiveElse(float64(v.Rumina), float64(prev.FilterRumina), 0.33, 0.67))
+			v.FilterRumina = int32(mqtt.computeIfPositiveElse(float64(v.Rumina), float64(prev.FilterRumina), 0.33, 0.67))
 		}
 		if v.FilterChew <= 0 {
-			v.FilterChew = int32(computeIfPositiveElse(float64(v.Rumina+v.Intake), float64(prev.FilterChew), 0.33, 0.67))
+			v.FilterChew = int32(mqtt.computeIfPositiveElse(float64(v.Rumina+v.Intake), float64(prev.FilterChew), 0.33, 0.67))
 		}
 		// 更新过滤值
 		filterValues[v.CowId] = v

+ 1 - 0
module/mqtt/interface.go

@@ -23,4 +23,5 @@ type Entry struct {
 type DataHandle interface {
 	NeckRingHandle(msg []byte)
 	NeckRingOriginalMergeData()
+	PastureUpdateActiveHabit() // 更新脖环数据 2分钟执行一下
 }

+ 195 - 151
module/mqtt/handle.go → module/mqtt/merge_handle.go

@@ -6,15 +6,13 @@ import (
 	"kpt-pasture/model"
 	"kpt-pasture/util"
 	"math"
+	"sort"
 	"strconv"
 	"strings"
-	"sync"
 	"time"
 
 	"github.com/jinzhu/copier"
 
-	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
-
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 	"gitee.com/xuyiping_admin/pkg/xerr"
 	"go.uber.org/zap"
@@ -24,7 +22,6 @@ import (
 type DataInsertNeckRingLog struct {
 	NeckRingOriginalData []*model.NeckRingOriginal
 	NeckRingErrorData    []*model.NeckRingError
-	Mx                   *sync.RWMutex
 }
 
 var (
@@ -34,24 +31,125 @@ var (
 	DSMLog       = &DataInsertNeckRingLog{
 		NeckRingOriginalData: make([]*model.NeckRingOriginal, 0),
 		NeckRingErrorData:    make([]*model.NeckRingError, 0),
-		Mx:                   &sync.RWMutex{},
 	}
-	isDelete bool
+	isDelete             bool
+	pastureMqttMap       = make(map[string]int64)
+	isFindPastureMqttMap bool
 )
 
 func (e *Entry) NeckRingHandle(data []byte) {
 	newData := e.MsgDataFormat2(data)
-	if newData == nil || len(newData) <= 0 {
+	if newData == nil {
 		return
 	}
-	zaplog.Info("NeckRingHandle", zap.Any("data", newData), zap.Any("original", string(data)), zap.Any("time", time.Now().Unix()))
-	batchList = append(batchList, newData...)
-	if len(batchList) >= batchSize {
-		e.processBatch(batchList)
-		batchList = batchList[:0] // 清空 batchList
+
+	// 写入数据
+	if err := e.CreatedData(newData); err != nil {
+		zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", newData))
 	}
 }
 
+// NeckRingOriginalMergeData 把脖环数据合并成2个小时的
+func (e *Entry) NeckRingOriginalMergeData() {
+	var err error
+	limit := e.Cfg.NeckRingLimit
+	if limit <= 0 {
+		limit = defaultLimit
+	}
+	mergeDataMaxId := e.GetSystemConfigure(model.UpdateOriginalMax).Value
+
+	newTime := time.Now()
+	neckRingList := make([]*model.NeckRingOriginal, 0)
+	if err = e.DB.Model(new(model.NeckRingOriginal)).
+		//Where("is_show = ?", pasturePb.IsShow_No).
+		Where("created_at <= ?", newTime.Add(-1*time.Hour).Unix()).
+		Where("id > ?", mergeDataMaxId).
+		Where("neck_ring_number = ?", "211690").
+		Order("id asc").Limit(int(limit)).
+		Find(&neckRingList).Error; err != nil {
+		return
+	}
+
+	if len(neckRingList) <= 0 {
+		return
+	}
+
+	defer func() {
+		newMergeDataMaxId := neckRingList[len(neckRingList)-1].Id
+		if newMergeDataMaxId > 0 && newMergeDataMaxId > mergeDataMaxId {
+			e.DB.Model(new(model.SystemConfigure)).
+				Where("name = ?", model.UpdateOriginalMax).
+				Update("value", newMergeDataMaxId)
+		}
+
+		if newTime.Day()%15 == 0 && !isDelete {
+			// 原始数据删除15天前的
+			e.DB.Model(new(model.NeckRingOriginal)).
+				Where("created_at < ?", newTime.AddDate(0, 0, -15).Unix()).
+				Delete(new(model.NeckRingOriginal))
+			// 活动数据删除6个月前的数据
+			e.DB.Model(new(model.NeckActiveHabit)).
+				Where("created_at < ?", newTime.AddDate(0, -6, 0).Unix()).
+				Delete(new(model.NeckActiveHabit))
+			isDelete = true
+		}
+	}()
+	// 计算合并
+	newNeckActiveHabitList := e.recalculate(neckRingList)
+	if len(newNeckActiveHabitList) <= 0 {
+		return
+	}
+
+	if err = e.DB.Transaction(func(tx *gorm.DB) error {
+		for _, neckActiveHabit := range newNeckActiveHabitList {
+			//更新脖环牛只相关信息 新数据直接插入
+			historyNeckActiveHabit, ct := e.IsExistNeckActiveHabit(neckActiveHabit.NeckRingNumber, neckActiveHabit.HeatDate, neckActiveHabit.Frameid)
+			if ct <= 0 {
+				if err = tx.Create(neckActiveHabit).Error; err != nil {
+					return xerr.WithStack(err)
+				}
+				tx.Create(model.NewNeckRingProcess(neckActiveHabit))
+				continue
+			}
+
+			if historyNeckActiveHabit == nil {
+				zaplog.Error("NeckRingOriginalMergeData", zap.Any("historyNeckActiveHabit", historyNeckActiveHabit))
+				continue
+			}
+
+			// 重新计算
+			newNeckActiveHabit := e.againRecalculate(historyNeckActiveHabit)
+			if neckActiveHabit != nil {
+				if err = tx.Model(new(model.NeckActiveHabit)).
+					Select("rumina", "intake", "inactive", "gasp", "other", "high", "active").
+					Where("id = ?", historyNeckActiveHabit.Id).
+					Updates(newNeckActiveHabit).Error; err != nil {
+					return xerr.WithStack(err)
+				}
+			}
+		}
+		return nil
+	}); err != nil {
+		zaplog.Error("NeckRingOriginalMergeData", zap.Any("transaction", err))
+		return
+	}
+}
+
+func (e *Entry) FindPastureMqttMap() map[string]int64 {
+	if isFindPastureMqttMap {
+		return pastureMqttMap
+	}
+	appMqttList := make([]*model.AppMqtt, 0)
+	if err := e.DB.Model(new(model.AppMqtt)).Find(&appMqttList).Error; err != nil {
+		zaplog.Error("FindPastureMqttMap", zap.Any("err", err))
+	}
+	for _, v := range appMqttList {
+		pastureMqttMap[v.ReceiveNumber] = v.PastureId
+	}
+	isFindPastureMqttMap = true
+	return pastureMqttMap
+}
+
 // 处理批量数据
 func (e *Entry) processBatch(batchList []*model.NeckRingOriginal) {
 	// 初始化分类数据
@@ -65,7 +163,11 @@ func (e *Entry) processBatch(batchList []*model.NeckRingOriginal) {
 		// 异常脖环数据
 		if ok := util.IsValidFrameId(batch.Frameid); !ok {
 			var ed model.NeckRingError
-			copier.Copy(&ed, &batch)
+			err := copier.Copy(&ed, &batch)
+			if err != nil {
+				zaplog.Error("processBatch", zap.Any("copier", err), zap.Any("data", batch))
+				continue
+			}
 			errorData = append(errorData, &ed)
 		} else {
 			originalData = append(originalData, batch)
@@ -73,8 +175,6 @@ func (e *Entry) processBatch(batchList []*model.NeckRingOriginal) {
 	}
 
 	// 更新日志
-	DSMLog.Mx.Lock()
-	defer DSMLog.Mx.Unlock()
 	DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, errorData...)
 	DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, originalData...)
 
@@ -108,6 +208,86 @@ func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
 	return nil
 }
 
+func (e *Entry) MsgDataFormat2(msg []byte) *DataInsertNeckRingLog {
+	neckLogList := &model.NeckRingWrapper{}
+	if err := json.Unmarshal(msg, neckLogList); err != nil {
+		zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
+		return nil
+	}
+
+	normalOriginal := make([]*model.NeckRingOriginal, 0)
+	errorOriginal := make([]*model.NeckRingError, 0)
+	for _, neckLog := range neckLogList.NeckRing.NeckPck {
+		newOriginal := model.NewNeckRingOriginal(neckLog, pastureMqttMap)
+		if ok := util.IsValidFrameId(neckLog.Frameid); !ok {
+			var ed model.NeckRingError
+			if err := copier.Copy(&ed, &newOriginal); err != nil {
+				zaplog.Error("MsgDataFormat2", zap.Any("copier", err), zap.Any("neckLog", neckLog))
+			}
+			errorOriginal = append(errorOriginal, &ed)
+		}
+		activeDate, hours := util.GetNeckRingActiveTimer(neckLog.Frameid)
+		newOriginal.ActiveDate = activeDate
+		newOriginal.Hours = int32(hours)
+		normalOriginal = append(normalOriginal, newOriginal)
+	}
+	return &DataInsertNeckRingLog{
+		NeckRingErrorData:    errorOriginal,
+		NeckRingOriginalData: normalOriginal,
+	}
+}
+
+// recalculate 合并计算
+func (e *Entry) recalculate(neckRingList []*model.NeckRingOriginal) []*model.NeckActiveHabit {
+	originalMapData := make(map[string]*model.NeckRingOriginalMerge)
+	// 合并成2个小时的
+	for _, v := range neckRingList {
+		if v.NeckRingNumber != "211690" {
+			continue
+		}
+		xframeId := int32(math.Floor(float64(v.Frameid) / 10))
+		mapKey := fmt.Sprintf("%s%s%s%s%d", v.NeckRingNumber, model.JoinKey, v.ActiveDate, model.JoinKey, xframeId) // 0001/2023-12-04/0 0001/2023-12-03/4
+		if _, ok := originalMapData[mapKey]; !ok {
+			originalMapData[mapKey] = new(model.NeckRingOriginalMerge)
+		}
+		originalMapData[mapKey].IsMageData(v, xframeId)
+	}
+
+	// 算平均值
+	for _, v := range originalMapData {
+		v.SumAvg()
+	}
+
+	dataList := model.NeckRingOriginalMap(originalMapData).ForMatData()
+	sort.Slice(dataList, func(i, j int) bool {
+		return dataList[i].ActiveTime < dataList[j].ActiveTime
+	})
+	return dataList
+}
+
+func (e *Entry) againRecalculate(data *model.NeckActiveHabit) *model.NeckActiveHabit {
+	originalList := make([]*model.NeckRingOriginal, 0)
+	originalFrameId := data.Frameid
+	frameIds := make([]int32, 0)
+	for i := 1; i <= 6; i++ {
+		frameIds = append(frameIds, originalFrameId*10+int32(i))
+	}
+	if err := e.DB.Model(new(model.NeckRingOriginal)).
+		Where("neck_ring_number = ?", data.NeckRingNumber).
+		Where("active_date = ?", data.HeatDate).
+		Where("frameid IN ?", frameIds).
+		Find(&originalList).Error; err != nil {
+		return nil
+	}
+
+	newDataList := e.recalculate(originalList)
+	zaplog.Info("againRecalculate", zap.Any("newDataList", newDataList))
+	if len(newDataList) != 1 {
+		return nil
+	}
+	return newDataList[0]
+}
+
 func (e *Entry) MsgDataFormat(msg []byte) []*model.NeckRingOriginal {
 	msgData := make(map[string]interface{})
 	pairs := strings.Split(util.MsgFormat(string(msg)), " ")
@@ -281,139 +461,3 @@ func (e *Entry) MsgDataFormat(msg []byte) []*model.NeckRingOriginal {
 		},
 	}
 }
-
-func (e *Entry) MsgDataFormat2(msg []byte) []*model.NeckRingOriginal {
-	neckLogList := &NeckRingWrapper{}
-	if err := json.Unmarshal(msg, neckLogList); err != nil {
-		zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
-	}
-
-	res := make([]*model.NeckRingOriginal, 0)
-	for _, neckLog := range neckLogList.NeckRing.NeckPck {
-		activeDate, hours := util.GetNeckRingActiveTimer(neckLog.Frameid)
-		activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
-		if neckLog.Frameid%10 == 8 {
-			activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
-		}
-		res = append(res, &model.NeckRingOriginal{
-			Uuid:            neckLog.UUID,
-			NeckRingNumber:  fmt.Sprintf("%d", neckLog.Ecowid),
-			ActiveDate:      activeDate,
-			Hours:           int32(hours),
-			Frameid:         neckLog.Frameid,
-			Rumina:          neckLog.Rumina,
-			Intake:          neckLog.Intake,
-			Inactive:        neckLog.Inactive,
-			Gasp:            neckLog.Other,
-			High:            neckLog.Activitys,
-			Active:          neckLog.High,
-			FirmwareVersion: neckLog.Sver,
-			HardwareVersion: neckLog.Hver,
-			Remain:          neckLog.Remain,
-			Voltage:         neckLog.BAT,
-			RestartReason:   neckLog.STATUS,
-			Upper:           neckLog.UpPer,
-			Imei:            neckLog.Imei,
-			ReceiveNumber:   neckLog.Imei,
-			ActiveDateType:  activeDateTimeType,
-			IsShow:          pasturePb.IsShow_No,
-		})
-	}
-	return res
-}
-
-// NeckRingOriginalMergeData 把脖环数据合并成2个小时的
-func (e *Entry) NeckRingOriginalMergeData() {
-	limit := e.Cfg.NeckRingLimit
-	if limit <= 0 {
-		limit = defaultLimit
-	}
-
-	updateOriginalMaxId := e.GetSystemConfigure(model.UpdateOriginalMaxId).Value
-	neckRingList := make([]*model.NeckRingOriginal, 0)
-	if err := e.DB.Model(new(model.NeckRingOriginal)).
-		Where("id > ?", updateOriginalMaxId).
-		Order("id asc").Limit(int(limit)).
-		Find(&neckRingList).Error; err != nil {
-		return
-	}
-
-	if len(neckRingList) <= 0 {
-		return
-	}
-
-	defer func() {
-		currTime := time.Now()
-		// 删除15天前的数据
-		if currTime.Day()%15 == 0 && !isDelete {
-			e.DB.Model(new(model.NeckRingOriginal)).
-				Where("active_date < ?", currTime.AddDate(0, 0, -15).Format(model.LayoutDate2)).
-				Delete(new(model.NeckRingOriginal))
-			e.DB.Model(new(model.NeckRingProcess)).
-				Where("active_date < ?", currTime.AddDate(0, 0, -5).Format(model.LayoutDate2)).
-				Delete(new(model.NeckRingProcess))
-			isDelete = true
-		}
-	}()
-
-	originalMapData := make(map[string]*model.NeckRingOriginalMerge)
-	// 合并成2个小时的
-	for _, v := range neckRingList {
-		xframeId := int(math.Floor(float64(v.Frameid)/10) * 2)
-		mapKey := fmt.Sprintf("%s%s%s%s%d", v.NeckRingNumber, model.JoinKey, v.ActiveDate, model.JoinKey, xframeId) // 0001/2023-12-04/0 0001/2023-12-03/4
-		if _, ok := originalMapData[mapKey]; !ok {
-			originalMapData[mapKey] = new(model.NeckRingOriginalMerge)
-		}
-		v.IsAvgHours()
-		originalMapData[mapKey].IsMageData(v)
-	}
-
-	// 算平均值
-	for _, v := range originalMapData {
-		v.SumAvg()
-	}
-
-	zaplog.Info("NeckRingOriginalMergeData", zap.Any("originalMapData", originalMapData))
-	// 更新脖环牛只相关信息
-	newNeckActiveHabitList := model.NeckRingOriginalMap(originalMapData).ForMatData()
-	if err := e.DB.Transaction(func(tx *gorm.DB) error {
-		// 更新已处理过的id
-		processMaxId := neckRingList[len(neckRingList)-1].Id
-		fmt.Println("updateOriginalMaxId", processMaxId)
-		if err := tx.Model(new(model.SystemConfigure)).
-			Where("name = ?", model.UpdateOriginalMaxId).
-			Update("value", processMaxId).
-			Error; err != nil {
-			return xerr.WithStack(err)
-		}
-
-		for _, neckActiveHabit := range newNeckActiveHabitList {
-			// 新数据直接插入
-			historyNeckActiveHabit, ct := e.IsExistNeckActiveHabit(neckActiveHabit.NeckRingNumber, neckActiveHabit.HeatDate, neckActiveHabit.Frameid)
-			if ct <= 0 {
-				if err := tx.Create(neckActiveHabit).Error; err != nil {
-					return xerr.WithStack(err)
-				}
-				tx.Create(model.NewNeckRingProcess(neckActiveHabit))
-				continue
-			}
-
-			if historyNeckActiveHabit == nil {
-				zaplog.Error("NeckRingOriginalMergeData", zap.Any("historyNeckActiveHabit", historyNeckActiveHabit))
-				continue
-			}
-			// 更新数据
-			historyNeckActiveHabit.MergeData(neckActiveHabit)
-			if err := tx.Model(new(model.NeckActiveHabit)).
-				Select("rumina", "intake", "inactive", "gasp", "other", "high", "active").
-				Where("id = ?", historyNeckActiveHabit.Id).
-				Updates(historyNeckActiveHabit).Error; err != nil {
-				return xerr.WithStack(err)
-			}
-		}
-		return nil
-	}); err != nil {
-		zaplog.Error("NeckRingOriginalMergeData", zap.Any("transaction", err))
-		return
-	}
-}

+ 152 - 100
module/crontab/neck_ring_habit.go → module/mqtt/neck_ring_habit.go

@@ -1,11 +1,15 @@
-package crontab
+package mqtt
 
 import (
 	"fmt"
 	"kpt-pasture/model"
+	"kpt-pasture/module/crontab"
 	"math"
 	"time"
 
+	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
+	"go.uber.org/zap"
+
 	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 	"gitee.com/xuyiping_admin/pkg/xerr"
 )
@@ -18,37 +22,56 @@ const (
 	DefaultNb       = 30
 )
 
-func (e *Entry) EntryUpdateActiveHabit() (err error) {
-	lastMaxHabitId := e.GetSystemConfigure(model.MaxHabit).Value
+func (e *Entry) PastureUpdateActiveHabit() {
+	pastureList := e.FindPastureList()
+	if pastureList == nil || len(pastureList) == 0 {
+		return
+	}
+
+	for _, pasture := range pastureList {
+		if err := e.EntryUpdateActiveHabit(pasture.Id); err != nil {
+			zaplog.Error("PastureUpdateActiveHabit", zap.Any("PastureUpdateActiveHabit", err), zap.Any("pasture", pasture))
+		}
+		zaplog.Info(fmt.Sprintf("PastureUpdateActiveHabit Success %d", pasture.Id))
+	}
+}
+
+func (e *Entry) EntryUpdateActiveHabit(pastureId int64) (err error) {
+	lastMaxHabitData, err := e.GetSystemConfigure2(pastureId, model.MaxHabit)
+	if err != nil {
+		return xerr.WithStack(err)
+	}
+
+	lastMaxHabitId := lastMaxHabitData.Value
 	currentMaxHabit := &model.NeckActiveHabit{}
-	if err = e.DB.Model(new(model.NeckActiveHabit)).
+	if err = e.DB.Model(currentMaxHabit).
 		Where("id > ?", lastMaxHabitId).
 		Order("id desc").First(currentMaxHabit).Error; err != nil {
 		return xerr.WithStack(err)
 	}
-
 	// 本次执行<=上次执行的id,则不执行
-	if currentMaxHabit.Id < int64(lastMaxHabitId) {
+	if currentMaxHabit.Id < lastMaxHabitId {
 		return nil
 	}
 
 	// 统一更新is_max_time为0
 	if err = e.DB.Model(new(model.NeckActiveHabit)).
 		Where("is_max_time = ?", pasturePb.IsShow_Ok).
+		Where("heat_date > ?", time.Now().AddDate(0, 0, -10).Format(model.LayoutDate2)).
 		Update("is_max_time", pasturePb.IsShow_No).Error; err != nil {
 		return xerr.WithStack(err)
 	}
 
 	// 获取这段执行数据内最大日期和最小日期
-	xToday := &XToday{}
+	xToday := &crontab.XToday{}
 	if err = e.DB.Model(new(model.NeckActiveHabit)).
-		Select(`MIN(h.heat_date) as x_beg_date, MAX(h.heat_date) as x_end_date`).
-		Where("id BETWEEN ? AND ?", lastMaxHabitId, currentMaxHabit).
+		Select(`MIN(heat_date) as x_beg_date, MAX(heat_date) as x_end_date`).
+		Where("id BETWEEN ? AND ?", lastMaxHabitId, currentMaxHabit.Id).
 		First(xToday).Error; err != nil {
 		return xerr.WithStack(err)
 	}
 
-	xToday.LastMaxHabitId = int64(lastMaxHabitId)
+	xToday.LastMaxHabitId = lastMaxHabitId
 	xToday.CurrMaxHabitId = currentMaxHabit.Id
 
 	minHeatDateParse, err := time.Parse(model.LayoutDate2, xToday.XBegDate)
@@ -71,6 +94,7 @@ func (e *Entry) EntryUpdateActiveHabit() (err error) {
 		if err == nil {
 			e.DB.Model(new(model.SystemConfigure)).
 				Where("name = ?", model.MaxHabit).
+				Where("pasture_id = ?", pastureId).
 				Update("value", currentMaxHabit.Id)
 		}
 	}()
@@ -78,51 +102,72 @@ func (e *Entry) EntryUpdateActiveHabit() (err error) {
 	xToday.XMin7Id = xMin7Id
 
 	//  id到上一次执行结果并且heat_date > 7天之前的最大牛只id置为is_max_time=1
-	sqlQuery := e.DB.Model(new(model.NeckActiveHabit)).
-		Select("MAX(id) as id").
-		Where("id BETWEEN ? AND ?", xToday.XMin2Id, xToday.LastMaxHabitId).
-		Where("change_filter > ?", MinChangeFilter).
-		Where("heat_date >", xBefore7Day).
-		Group("cow_id")
-
+	maxHabitIdArray := make([]*model.MaxHabitIdModel, 0)
 	if err = e.DB.Model(new(model.NeckActiveHabit)).
-		Joins("JOIN (?) bb ON neck_active_habit.id = bb.id", sqlQuery).
-		Update("is_max_time", pasturePb.IsShow_Ok).Error; err != nil {
+		Select("Max(id) as id").
+		Where("change_filter > ?", model.DefaultChangeFilter).
+		Where("heat_date > ?", xBefore7Day).
+		Group("neck_ring_number").
+		Find(&maxHabitIdArray).Error; err != nil {
 		return xerr.WithStack(err)
 	}
 
-	activeLowest := e.GetSystemConfigure(model.ActiveLowest)
-	ruminaLowest := e.GetSystemConfigure(model.RuminaLowest)
-	xToday.ActiveLowest = int64(activeLowest.Value)
-	xToday.RuminaLowest = int64(ruminaLowest.Value)
+	if len(maxHabitIdArray) > 0 {
+		maxHabitIds := make([]int64, 0)
+		for _, v := range maxHabitIdArray {
+			maxHabitIds = append(maxHabitIds, v.Id)
+		}
+		if err = e.DB.Model(new(model.NeckActiveHabit).TableName()).
+			Where("id IN (?)", maxHabitIds).
+			Update("is_max_time", pasturePb.IsShow_Ok).Error; err != nil {
+			return xerr.WithStack(err)
+		}
+	}
 
+	activeLowest, err := e.GetSystemConfigure2(pastureId, model.ActiveLowest)
+	if err != nil {
+		return xerr.WithStack(err)
+	}
+	ruminaLowest, err := e.GetSystemConfigure2(pastureId, model.RuminaLowest)
+	if err != nil {
+		return xerr.WithStack(err)
+	}
+	xToday.ActiveLowest = activeLowest.Value
+	xToday.RuminaLowest = ruminaLowest.Value
 	// 更新活动滤波
-	if err = e.FilterUpdate(xToday); err != nil {
+	if err = e.FilterUpdate(pastureId, xToday); err != nil {
+		zaplog.Error("EntryUpdateActiveHabit", zap.Any("FilterUpdate", err), zap.Any("xToday", xToday))
 		return xerr.WithStack(err)
 	}
 	// 更新周平均值
-	if err = e.WeeklyActiveAvgUpdate(xToday); err != nil {
+	if err = e.WeeklyActiveAvgUpdate(pastureId, xToday); err != nil {
+		zaplog.Error("EntryUpdateActiveHabit", zap.Any("WeeklyActiveAvgUpdate", err), zap.Any("xToday", xToday))
 		return xerr.WithStack(err)
 	}
 
+	if err = e.ActivityVolumeChanges(pastureId, xToday); err != nil {
+		zaplog.Error("EntryUpdateActiveHabit", zap.Any("ActivityVolumeChanges", err), zap.Any("xToday", xToday))
+		return xerr.WithStack(err)
+	}
 	return nil
 }
 
 // FilterUpdate 更新活动滤波
-func (e *Entry) FilterUpdate(xToDay *XToday) error {
+func (e *Entry) FilterUpdate(pastureId int64, xToDay *crontab.XToday) error {
 	newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0)
 	if err := e.DB.Model(new(model.NeckActiveHabit)).
-		Where(e.DB.Where("change_filter = ?", model.DefaultChangeFilter).Or("is_max_time = ?", pasturePb.IsShow_Ok)).
+		Where("pasture_id = ?", pastureId).
+		Where(e.DB.Where("change_filter = ?", model.InitChangeFilter).Or("is_max_time = ?", pasturePb.IsShow_Ok)).
 		Where(e.DB.Where("high >= ?", xToDay.ActiveLowest).Or("rumina >= ?", xToDay.RuminaLowest)).
-		Order("cow_id,id").
+		Order("neck_ring_number,id").
 		Find(&newNeckActiveHabitList).Error; err != nil {
 		return xerr.WithStack(err)
 	}
 
-	var filterValues = make(map[int64]*model.NeckActiveHabit)
+	var filterValues = make(map[string]*model.NeckActiveHabit)
 	// 活动量滤波
 	for _, v := range newNeckActiveHabitList {
-		prev, ok := filterValues[v.CowId]
+		prev, ok := filterValues[v.NeckRingNumber]
 		if !ok {
 			if v.FilterHigh <= 0 {
 				v.FilterHigh = v.High
@@ -133,7 +178,7 @@ func (e *Entry) FilterUpdate(xToDay *XToday) error {
 			if v.FilterChew <= 0 {
 				v.FilterChew = v.Rumina + v.Intake
 			}
-			filterValues[v.CowId] = v
+			filterValues[v.NeckRingNumber] = v
 			continue
 		}
 		if v.FilterHigh <= 0 {
@@ -146,7 +191,7 @@ func (e *Entry) FilterUpdate(xToDay *XToday) error {
 			v.FilterChew = int32(computeIfPositiveElse(float64(v.Rumina+v.Intake), float64(prev.FilterChew), 0.33, 0.67))
 		}
 		// 更新过滤值
-		filterValues[v.CowId] = v
+		filterValues[v.NeckRingNumber] = v
 		if err := e.DB.Model(new(model.NeckActiveHabit)).
 			Select("filter_high", "filter_rumina", "filter_chew").
 			Where("id = ?", v.Id).
@@ -154,10 +199,11 @@ func (e *Entry) FilterUpdate(xToDay *XToday) error {
 			return xerr.WithStack(err)
 		}
 	}
+	zaplog.Info("EntryUpdateActiveHabit-FilterUpdate-Success")
 	return nil
 }
 
-func (e *Entry) WeeklyActiveAvgUpdate(xToday *XToday) error {
+func (e *Entry) WeeklyActiveAvgUpdate(pastureId int64, xToday *crontab.XToday) error {
 	beginDayDate, err := time.Parse(model.LayoutDate2, xToday.XBegDate)
 	if err != nil {
 		return xerr.WithStack(err)
@@ -165,16 +211,19 @@ func (e *Entry) WeeklyActiveAvgUpdate(xToday *XToday) error {
 	before7DayDate := beginDayDate.AddDate(0, 0, -7).Format(model.LayoutDate2)
 	before1DayDate := beginDayDate.AddDate(0, 0, -1).Format(model.LayoutDate2)
 
-	weeklyActive := e.GetSystemConfigure(model.WeeklyActive)
+	weeklyActive, err := e.GetSystemConfigure2(pastureId, model.WeeklyActive)
+	if err != nil {
+		return xerr.WithStack(err)
+	}
 	xframeId := int64(0)
 	maxXframeId := int64(11)
 	xStartDate, _ := time.Parse(model.LayoutDate2, xToday.XBegDate)
 	xEndDate, _ := time.Parse(model.LayoutDate2, xToday.XEndDate)
 	for xStartDate.Format(model.LayoutDate2) < xEndDate.Format(model.LayoutDate2) || (xStartDate == xEndDate && xframeId <= maxXframeId) {
 		//  时间点周平均
-		AvgHabitList := make([]*AvgHabit, 0)
+		AvgHabitList := make([]*crontab.AvgHabit, 0)
 		if err = e.DB.Model(new(model.NeckActiveHabit)).
-			Select("cow_id").
+			Select("neck_ring_number").
 			Select("IF(COUNT(1)>=3, ROUND((SUM(filter_high) -MIN(filter_high) -MAX(filter_high))/ABS(COUNT(1) -2),0), -1) as avg_high_habit").
 			Select("IF(COUNT(1)>=3, ROUND((SUM(filter_rumina) -MIN(filter_rumina) -MAX(filter_rumina))/ABS(COUNT(1) -2),0), -1) as avg_rumina_habit").
 			Select("IF(COUNT(1)>=3, ROUND((SUM(filter_chew) -MIN(filter_chew) -MAX(filter_chew))/ABS(COUNT(1) -2),0), -1) as avg_chew_habit").
@@ -183,19 +232,20 @@ func (e *Entry) WeeklyActiveAvgUpdate(xToday *XToday) error {
 			Where("id BETWEEN ? AND ?", xToday.XMin7Id, xToday.CurrMaxHabitId).
 			Where("heat_date BETWEEN ? AND ?", before7DayDate, before1DayDate).
 			Where("frameid = ?", xframeId).
-			Where("change_filter = ?", model.DefaultChangeFilter).
+			Where("pasture_id = ?", pastureId).
+			Where("change_filter = ?", model.InitChangeFilter).
 			Where(e.DB.Where("high > ?", xToday.ActiveLowest).Or("rumina > ?", xToday.RuminaLowest)).
-			Group("cow_id").
+			Group("neck_ring_number").
 			Find(&AvgHabitList).Error; err != nil {
 			return xerr.WithStack(err)
 		}
 		for _, v := range AvgHabitList {
-			if err := e.DB.Model(new(model.NeckActiveHabit)).
-				Select("week_avg_high_habit", "avg_rumina_habit", "avg_chew_habit", "avg_intake_habit", "avg_inactive_habit").
-				Where("cow_id = ?", v.CowId).
+			if err = e.DB.Model(new(model.NeckActiveHabit)).
+				Select("avg_high_habit", "avg_rumina_habit", "avg_chew_habit", "avg_intake_habit", "avg_inactive_habit").
+				Where("neck_ring_number = ?", v.NeckRingNumber).
 				Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
 				Where("frameid = ?", xframeId).
-				Where("change_filter = ?", model.DefaultChangeFilter).
+				Where("change_filter = ?", model.InitChangeFilter).
 				Where("heat_date = ?", xStartDate).
 				Updates(v).Error; err != nil {
 				return xerr.WithStack(err)
@@ -203,9 +253,9 @@ func (e *Entry) WeeklyActiveAvgUpdate(xToday *XToday) error {
 		}
 
 		// 累计24小时数值
-		sumHabitList := make([]*SumHabit, 0)
+		sumHabitList := make([]*crontab.SumHabit, 0)
 		if err = e.DB.Model(new(model.NeckActiveHabit)).
-			Select("cow_id").
+			Select("neck_ring_number").
 			Select("IF(COUNT(1)>6, ROUND(AVG( h2.filter_rumina)*12,0), 0) as sum_rumina").
 			Select("IF(COUNT(1)>6, ROUND(AVG( h2.intake)*12,0), 0) as sum_intake").
 			Select("IF(COUNT(1)>6, ROUND(AVG( h2.inactive)*12,0), 0) as sum_inactive").
@@ -214,10 +264,11 @@ func (e *Entry) WeeklyActiveAvgUpdate(xToday *XToday) error {
 			Select("MIN(IF(change_filter > ?, change_filter, 0)) as sum_min_high", MinChangeFilter).
 			Select("MIN( CASE WHEN filter_chew > ? THEN filter_chew WHEN filter_rumina >= ? THEN filter_rumina ELSE 0 END) as sum_min_chew", MinChangeFilter, MinRuminaFilter).
 			Where("id BETWEEN ? AND ?", xToday.XMin2Id, xToday.CurrMaxHabitId).
+			Where("pasture_id = ?", pastureId).
 			Where("heat_date BETWEEN ? AND ?", xStartDate.AddDate(0, 0, -1).Format(model.LayoutDate2), xStartDate.Format(model.LayoutDate2)).
 			Where("created_at BETWEEN ? AND ?", xStartDate.Add(-23*time.Hour).Unix(), xStartDate.Unix()).
 			Where(e.DB.Where("high > ?", xToday.ActiveLowest).Or("rumina >= ?", xToday.RuminaLowest)).
-			Group("cow_id").
+			Group("neck_ring_number").
 			Find(&sumHabitList).Error; err != nil {
 			return xerr.WithStack(err)
 		}
@@ -225,11 +276,11 @@ func (e *Entry) WeeklyActiveAvgUpdate(xToday *XToday) error {
 		for _, v := range sumHabitList {
 			if err = e.DB.Model(new(model.NeckActiveHabit)).
 				Select("sum_rumina", "sum_intake", "sum_inactive", "sum_active", "sum_max_high", "sum_min_high", "sum_min_chew").
-				Where("cow_id = ?", v.CowId).
+				Where("neck_ring_number = ?", v.NeckRingNumber).
 				Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
 				Where("heat_date = ?", xStartDate.Format(model.LayoutDate2)).
 				Where("frameid = ?", xframeId).
-				Where("change_filter = ?", model.DefaultChangeFilter).
+				Where("change_filter = ?", model.InitChangeFilter).
 				Updates(v).Error; err != nil {
 				return xerr.WithStack(err)
 			}
@@ -240,8 +291,9 @@ func (e *Entry) WeeklyActiveAvgUpdate(xToday *XToday) error {
 			Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
 			Where("heat_date = ?", xStartDate.Format(model.LayoutDate2)).
 			Where("frameid = ?", xframeId).
-			Where("change_filter = ?", model.DefaultChangeFilter).
-			Where("week_avg_high_habit > ?", 0).
+			Where("pasture_id = ?", pastureId).
+			Where("change_filter = ?", model.InitChangeFilter).
+			Where("avg_high_habit > ?", 0).
 			Where(e.DB.Where("high > ?", xToday.ActiveLowest).Or("rumina >= ?", xToday.RuminaLowest)).
 			Find(&changeHabitList).Error; err != nil {
 			return xerr.WithStack(err)
@@ -271,28 +323,35 @@ func (e *Entry) WeeklyActiveAvgUpdate(xToday *XToday) error {
 			xframeId++
 		}
 	}
-
+	zaplog.Info("EntryUpdateActiveHabit-WeeklyActiveAvgUpdate-Success")
 	return nil
 }
 
 // UpdateChangeFilter  变化趋势滤波
-func (e *Entry) UpdateChangeFilter(xToday *XToday) error {
-	xRuminaDisc := e.GetSystemConfigure(model.XRuminaDisc)
-	xChangeDiscount := e.GetSystemConfigure(model.XChangeDiscount)
-	newChangeFilterList := make([]*ChangeFilterData, 0)
-	if err := e.DB.Model(new(model.NeckActiveHabit)).
-		Select("id,cow_id,change_high,change_filter,rumina_filter,change_rumina,chew_filter,change_chew").
+func (e *Entry) UpdateChangeFilter(pastureId int64, xToday *crontab.XToday) (err error) {
+	xRuminaDisc, err := e.GetSystemConfigure2(pastureId, model.XRuminaDisc)
+	if err != nil {
+		return err
+	}
+	xChangeDiscount, err := e.GetSystemConfigure2(pastureId, model.XChangeDiscount)
+	if err != nil {
+		return err
+	}
+	newChangeFilterList := make([]*crontab.ChangeFilterData, 0)
+	if err = e.DB.Model(new(model.NeckActiveHabit)).
+		Select("id,neck_ring_number,change_high,change_filter,rumina_filter,change_rumina,chew_filter,change_chew").
 		Select("IF(lact=0,0.8,1) as xlc_dis_count").
 		Where("id BETWEEN ? AND ?", xToday.XMin2Id, xToday.CurrMaxHabitId).
-		Where(e.DB.Where("change_filter = ?", model.DefaultChangeFilter).Or("is_max_time = ?", pasturePb.IsShow_Ok)).
+		Where("pasture_id = ?", pastureId).
+		Where(e.DB.Where("change_filter = ?", model.InitChangeFilter).Or("is_max_time = ?", pasturePb.IsShow_Ok)).
 		Where("change_high > ?", MinChangeHigh).
-		Order("cow_id,heat_date,frameid").
+		Order("neck_ring_number,heat_date,frameid").
 		Find(&newChangeFilterList).Error; err != nil {
 		return xerr.WithStack(err)
 	}
-	var filterValues = make(map[int64]*ChangeFilterData)
+	var filterValues = make(map[string]*crontab.ChangeFilterData)
 	for _, v := range newChangeFilterList {
-		prev, ok := filterValues[v.CowId]
+		prev, ok := filterValues[v.NeckRingNumber]
 		if v.ChangeFilter <= MinChangeFilter {
 			prefChangeFilter := int32(0)
 			if ok {
@@ -342,29 +401,29 @@ func (e *Entry) UpdateChangeFilter(xToday *XToday) error {
 		if v.ChewFilter > 50 {
 			v.ChangeChew = 50
 		}
-		if err := e.DB.Model(new(model.NeckActiveHabit)).
+		if err = e.DB.Model(new(model.NeckActiveHabit)).
 			Select("change_filter", "rumina_filter", "chew_filter").
 			Where("id = ?", v.Id).
-			Where("cow_id = ?", v.CowId).
-			Where("change_filter = ?", model.DefaultChangeFilter).
+			Where("neck_ring_number = ?", v.NeckRingNumber).
+			Where("change_filter = ?", model.InitChangeFilter).
 			Updates(v).Error; err != nil {
 			return xerr.WithStack(err)
 		}
-		filterValues[v.CowId] = v
+		filterValues[v.NeckRingNumber] = v
 	}
 
 	return nil
 }
 
 // ActivityVolumeChanges 计算活动量变化趋势校正值(活跃度校正)
-func (e *Entry) ActivityVolumeChanges(xToday *XToday) error {
+func (e *Entry) ActivityVolumeChanges(pastureId int64, xToday *crontab.XToday) error {
 	currDate, _ := time.Parse(model.LayoutDate2, xToday.XBegDate)
 	XEndDateTime, _ := time.Parse(model.LayoutDate2, xToday.XEndDate)
 	xframeId := int64(0)
 	maxXframeId := int64(11)
 	dayTimes := int64(1)
 	for currDate.Format(model.LayoutDate2) < XEndDateTime.Format(model.LayoutDate2) || (currDate == XEndDateTime && xframeId <= maxXframeId) {
-		activityVolumeList := make([]*ActivityVolume, 0)
+		activityVolumeList := make([]*crontab.ActivityVolume, 0)
 		activeTime := fmt.Sprintf("%s %02d:00:00", currDate.Format(model.LayoutDate2), xframeId*2)
 		activeTimeParse, err := time.Parse(model.LayoutTime, activeTime)
 		if err != nil {
@@ -372,18 +431,19 @@ func (e *Entry) ActivityVolumeChanges(xToday *XToday) error {
 		}
 		if dayTimes == 1 {
 			if err = e.DB.Model(new(model.NeckActiveHabit)).
-				Select("cow_id").
+				Select("neck_ring_number").
 				Select("AVG(IF(change_filter>=60, 60, change_filter)) as avg_filter").
 				Select("ROUND(STD(IF(change_filter>=60, 60, change_filter))) as std_filter").
 				Select("COUNT(1) as nb").
 				Where("id BETWEEN ? AND ?", xToday.XMin7Id, xToday.CurrMaxHabitId).
 				Where("heat_date BETWEEN ? AND ?", currDate.AddDate(0, 0, -7).Format(model.LayoutDate2), currDate.AddDate(0, 0, -1).Format(model.LayoutDate2)).
 				Where("frameid = ?", xframeId).
+				Where("pasture_id = ?", pastureId).
 				Where(e.DB.Where("high > ?", xToday.ActiveLowest).Or("rumina >= ?", xToday.RuminaLowest)).
 				Where("active_time <= ?", activeTimeParse.Add(-12*time.Hour)).
 				Where("change_filter > ?", MinChangeFilter).
 				Having("nb > ?", DefaultNb).
-				Group("cow_id").
+				Group("neck_ring_number").
 				Find(&activityVolumeList).Error; err != nil {
 				return xerr.WithStack(err)
 			}
@@ -392,7 +452,7 @@ func (e *Entry) ActivityVolumeChanges(xToday *XToday) error {
 		for _, v := range activityVolumeList {
 			filterCorrect := model.DefaultFilterCorrect - int(math.Floor(float64(v.AvgFilter)/3+float64(v.StdFilter)/2))
 			if err = e.DB.Model(new(model.NeckActiveHabit)).
-				Where("cow_id = ?", v.CowId).
+				Where("neck_ring_number = ?", v.NeckRingNumber).
 				Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
 				Where("frameid = ?", xframeId).
 				Where("head_date = ?", currDate.Format(model.LayoutDate2)).
@@ -409,44 +469,36 @@ func (e *Entry) ActivityVolumeChanges(xToday *XToday) error {
 			n += 2
 		}*/
 
-		if err = e.DB.Model(new(model.NeckActiveHabit)).
-			Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
-			Where("heat_date = ?", currDate.Format(model.LayoutDate2)).
-			Where("frameid = ?", xframeId).
-			Where("change_filter = ?", model.DefaultChangeFilter).
-			Update("change_filter", MinChangeFilter).Error; err != nil {
-			return xerr.WithStack(err)
-		}
-
-		if err = e.DB.Model(new(model.NeckActiveHabit)).
-			Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
-			Where("heat_date = ?", currDate.Format(model.LayoutDate2)).
-			Where("frameid = ?", xframeId).
-			Where("rumina_filter = ?", model.DefaultRuminaFilter).
-			Update("rumina_filter", MinRuminaFilter).Error; err != nil {
-			return xerr.WithStack(err)
-		}
+		zaplog.Info("ActivityVolumeChanges",
+			zap.Any("xToday", xToday),
+			zap.Any("currDate", currDate.Format(model.LayoutDate2)),
+			zap.Any("xframeId", xframeId),
+			zap.Any("activityVolumeList", activityVolumeList),
+		)
 
 		if err = e.DB.Model(new(model.NeckActiveHabit)).
 			Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
 			Where("heat_date = ?", currDate.Format(model.LayoutDate2)).
 			Where("frameid = ?", xframeId).
-			Where("chew_filter = ?", model.DefaultChewFilter).
-			Update("chew_filter", MinChewFilter).Error; err != nil {
+			Where("change_filter = ?", model.InitChangeFilter).
+			Updates(map[string]interface{}{
+				"change_filter":  MinChangeFilter,
+				"rumina_filter":  MinRuminaFilter,
+				"chew_filter":    MinChewFilter,
+				"filter_correct": model.DefaultFilterCorrect,
+			}).Error; err != nil {
 			return xerr.WithStack(err)
 		}
-
-		if err = e.DB.Model(new(model.NeckActiveHabit)).
-			Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
-			Where("heat_date = ?", currDate.Format(model.LayoutDate2)).
-			Where("frameid = ?", xframeId).
-			Where("filter_correct < ?", model.DefaultFilterCorrect).
-			Where("change_filter < ?", 0).
-			Update("filter_correct", model.DefaultFilterCorrect).Error; err != nil {
-			return xerr.WithStack(err)
+		if xframeId == maxXframeId {
+			xframeId = 0
+			currDate = currDate.AddDate(0, 0, 1)
+			dayTimes = 1
+		} else {
+			xframeId++
+			dayTimes++
 		}
 
-		// 更新评分
+		/*// 更新评分
 		newNeckActiveHabitList := make([]*model.NeckActiveHabit, 0)
 		if err = e.DB.Model(new(model.NeckActiveHabit)).
 			Where("id BETWEEN ? AND ?", xToday.LastMaxHabitId, xToday.CurrMaxHabitId).
@@ -455,10 +507,10 @@ func (e *Entry) ActivityVolumeChanges(xToday *XToday) error {
 			Where("score = ?", 0).
 			Find(&newNeckActiveHabitList).Error; err != nil {
 			return xerr.WithStack(err)
-		}
+		}*/
 		// todo 待开发
 	}
-
+	zaplog.Info("EntryUpdateActiveHabit-ActivityVolumeChanges-Success")
 	return nil
 }
 

+ 53 - 2
module/mqtt/sql.go

@@ -1,8 +1,15 @@
 package mqtt
 
 import (
+	"errors"
 	"kpt-pasture/model"
 
+	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
+	"go.uber.org/zap"
+
+	"gitee.com/xuyiping_admin/pkg/xerr"
+	"gorm.io/gorm"
+
 	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 )
 
@@ -44,16 +51,60 @@ func (e *Entry) IsExistNeckActiveHabit(neckRingNumber, heatDate string, frameId
 	neckRingProcess := &model.NeckRingProcess{}
 	if err := e.DB.Model(new(model.NeckRingProcess)).
 		Where("neck_ring_number = ?", neckRingNumber).
-		Where("heat_date = ?", heatDate).
+		Where("active_date = ?", heatDate).
 		Where("frameid = ?", frameId).
 		Count(&count).Error; err != nil {
 		return nil, 0
 	}
 	res := &model.NeckActiveHabit{}
 	if neckRingProcess != nil && neckRingProcess.HabitId > 0 {
-		if err := e.DB.Model(new(model.NeckActiveHabit)).Where("id = ?", neckRingProcess.HabitId).First(res).Error; err != nil {
+		if err := e.DB.Model(new(model.NeckActiveHabit)).
+			Where("id = ?", neckRingProcess.HabitId).
+			First(res).Error; err != nil {
 			return nil, 0
 		}
 	}
 	return res, count
 }
+
+// GetMinIdByHeatDate 获取最小的id
+func (e *Entry) GetMinIdByHeatDate(heatDate string, defaultId int64) (int64, error) {
+	xMinId := struct {
+		Id int64
+	}{}
+	if err := e.DB.Model(new(model.NeckActiveHabit)).
+		Select("MIN(id) as id").
+		//Where("heat_date = ?", minHeatDateParse.AddDate(0, 0, -1).Format(model.LayoutDate2)).
+		Where("heat_date >= ?", heatDate).
+		First(&xMinId).Error; err != nil {
+		if errors.Is(err, gorm.ErrRecordNotFound) {
+			xMinId.Id = defaultId
+		} else {
+			return 0, xerr.WithStack(err)
+		}
+	}
+	return xMinId.Id, nil
+}
+
+func (e *Entry) FindPastureList() []*model.AppPastureList {
+	res := make([]*model.AppPastureList, 0)
+	if err := e.DB.Model(new(model.AppPastureList)).
+		Where("is_show = ?", pasturePb.IsShow_Ok).
+		Find(&res).Error; err != nil {
+		zaplog.Error("FindPastureList error", zap.Any("err", err))
+		return res
+	}
+	return res
+}
+
+func (e *Entry) GetSystemConfigure2(pastureId int64, name string) (*model.SystemConfigure, error) {
+	res := &model.SystemConfigure{}
+	if err := e.DB.Model(new(model.SystemConfigure)).
+		Where("name = ?", name).
+		Where("pasture_id = ?", pastureId).
+		Where("is_show = ?", pasturePb.IsShow_Ok).
+		First(res).Error; err != nil {
+		return nil, xerr.WithStack(err)
+	}
+	return res, nil
+}

+ 5 - 3
service/mqtt/interface.go

@@ -114,8 +114,7 @@ func (s *IMqttClient) Run(enter handleMqtt.Entry) {
 	signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
 
 	ticker := time.NewTicker(time.Duration(s.Config.MergeDataTicker) * time.Minute)
-	defer ticker.Stop()
-
+	habitTicker := time.NewTicker(time.Duration(s.Config.MergeDataTicker) * time.Minute)
 	// 创建上下文,用于优雅关闭
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
@@ -127,15 +126,18 @@ func (s *IMqttClient) Run(enter handleMqtt.Entry) {
 			case <-stop:
 				cancel()
 				s.Close()
+				habitTicker.Stop()
+				ticker.Stop()
 				return
 			case <-ticker.C:
 				enter.NeckRingOriginalMergeData()
+			case <-habitTicker.C:
+				enter.PastureUpdateActiveHabit()
 			case <-ctx.Done():
 				return
 			}
 		}
 	}()
-
 	// 订阅主题
 	buffer := bufferPool.Get().([]byte)
 	if token := s.Client.Subscribe(s.Config.SubTopic, byte(s.Config.Qos), func(client mqtt.Client, msg mqtt.Message) {

+ 169 - 0
util/model.go

@@ -0,0 +1,169 @@
+package util
+
+import (
+	"fmt"
+	"strings"
+
+	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
+)
+
+type NeckRingOriginal struct {
+	Id              int64                         `json:"id"`
+	PastureId       int64                         `json:"pastureId"`
+	Uuid            string                        `json:"uuid"`
+	NeckRingNumber  string                        `json:"neckRingNumber"`  // 脖环号 (对应老表字段EID1)
+	ActiveDate      string                        `json:"activeDate"`      // 采集时间-天(YYYY-MM-DD对应老表字段heatdate)
+	Hours           int32                         `json:"hours"`           // 采集时间-小时(hours)
+	Frameid         int32                         `json:"frameid"`         // 采集时长(对应老表frameid)
+	Rumina          int32                         `json:"rumina"`          // 反刍时长(rumaina)
+	Intake          int32                         `json:"intake"`          // 采食时长(intake)
+	Inactive        int32                         `json:"inactive"`        // 静止时间(inactive)
+	Gasp            int32                         `json:"gasp"`            // 喘息时长(Other)
+	High            int32                         `json:"high"`            // 活动量(activitys)
+	Active          int32                         `json:"active"`          // 运动时长(High)
+	Other           int32                         `json:"other"`           // 其他时长
+	FirmwareVersion int32                         `json:"firmwareVersion"` // 固件版本(对应老表Version)
+	HardwareVersion int32                         `json:"hardwareVersion"` // 硬件版本
+	Remain          int32                         `json:"remain"`          // 脖环剩余数据量,57之后为上一次上报结果
+	Voltage         int32                         `json:"voltage"`         // 电池电压
+	RestartReason   int32                         `json:"restartReason"`   // 脖环重启原因 (对应老表HIB)
+	Upper           int32                         `json:"upper"`           // 脖环正向比例发射功率
+	ActiveDateType  pasturePb.ActiveTimeType_Kind `json:"ActiveDateTimeType"`
+	IsShow          pasturePb.IsShow_Kind         `json:"isShow"`
+	Imei            string                        `json:"imei"`          // 4G模组IMEI(imei)
+	ReceiveNumber   string                        `json:"receiveNumber"` // 接收器编号
+	CreatedAt       int64                         `json:"createdAt"`
+	UpdatedAt       int64                         `json:"updatedAt"`
+}
+
+func (n *NeckRingOriginal) TableName() string {
+	return "neck_ring_original"
+}
+
+var (
+	AvgHours = int32(6)
+	JoinKey  = "/"
+)
+
+type NeckRingOriginalMerge struct {
+	Rumina         int32
+	Inactive       int32
+	Active         int32
+	Intake         int32
+	Other          int32
+	High           int32
+	Gasp           int32
+	ActiveDate     string
+	NeckRingNumber string
+	XframeId       int32
+	ActiveDateType pasturePb.ActiveTimeType_Kind
+	RecordCount    int32
+}
+
+func (n *NeckRingOriginalMerge) IsMageData(data *NeckRingOriginal, xframeId int32) {
+	if n.ActiveDateType == pasturePb.ActiveTimeType_Two_Hours {
+		n.RecordCount = AvgHours
+	} else {
+		n.RecordCount += 1
+	}
+	n.Rumina += data.Rumina
+	n.Inactive += data.Inactive
+	n.Active += data.Active
+	n.Intake += data.Intake
+	n.Other += data.Other
+	n.Gasp += data.Gasp
+	n.High += data.High
+	n.ActiveDate = data.ActiveDate
+	n.NeckRingNumber = data.NeckRingNumber
+	n.XframeId = xframeId
+}
+
+func (n *NeckRingOriginalMerge) SumAvg() {
+	n.Rumina = n.Rumina / n.RecordCount * n.RecordCount
+	n.Inactive = n.Inactive / n.RecordCount * n.RecordCount
+	n.Active = n.Active / n.RecordCount * n.RecordCount
+	n.Intake = n.Intake / n.RecordCount * n.RecordCount
+	n.Other = n.Other / n.RecordCount * n.RecordCount
+	n.Gasp = n.Gasp / n.RecordCount * n.RecordCount
+	n.High = n.High / n.RecordCount * n.RecordCount
+}
+
+type NeckRingOriginalMap map[string]*NeckRingOriginalMerge
+
+func (n NeckRingOriginalMap) ForMatData() []*NeckActiveHabit {
+	res := make([]*NeckActiveHabit, 0)
+	for key, v := range n {
+		keyStrList := strings.Split(key, JoinKey)
+		if len(keyStrList) != 3 {
+			continue
+		}
+		res = append(res, NewNeckActiveHabit(v))
+	}
+	return res
+}
+
+func NewNeckActiveHabit(data *NeckRingOriginalMerge) *NeckActiveHabit {
+	return &NeckActiveHabit{
+		Frameid:        data.XframeId,
+		HeatDate:       data.ActiveDate,
+		NeckRingNumber: data.NeckRingNumber,
+		Active:         data.Active,
+		Gasp:           data.Gasp,
+		High:           data.High,
+		Inactive:       data.Inactive,
+		Intake:         data.Intake,
+		Other:          data.Other,
+		Rumina:         data.Rumina,
+		ActiveTime:     fmt.Sprintf("%s %02d:00:00", data.ActiveDate, data.XframeId),
+	}
+}
+
+type NeckActiveHabit struct {
+	Id                      int64                 `json:"id"`
+	CowId                   int64                 `json:"cowId"`
+	NeckRingNumber          string                `json:"neckRingNumber"`
+	ActiveTime              string                `json:"activeTime"`
+	Frameid                 int32                 `json:"frameid"`
+	HeatDate                string                `json:"heatDate"`
+	Rumina                  int32                 `json:"rumina"`
+	Intake                  int32                 `json:"intake"`
+	Inactive                int32                 `json:"inactive"`
+	Gasp                    int32                 `json:"gasp"`
+	Other                   int32                 `json:"other"`
+	High                    int32                 `json:"high"`
+	Active                  int32                 `json:"active"`
+	FilterHigh              int32                 `json:"filterHigh"`
+	FilterRumina            int32                 `json:"filterRumina"`
+	FilterChew              int32                 `json:"filterChew"`
+	WeekHigh                int32                 `json:"weekHigh"`
+	AvgHighHabit            int32                 `json:"avgHighHabit"`
+	AvgRuminaHabit          int32                 `json:"avgRuminaHabit"`
+	AvgIntakeHabit          int32                 `json:"avgIntakeHabit"`
+	AvgChewHabit            int32                 `json:"avgChewHabit"`
+	AvgInactiveHabit        int32                 `json:"avgInactiveHabit"`
+	AvgOtherHabit           int32                 `json:"avgOtherHabit"`
+	ChangeHigh              int32                 `json:"changeHigh"`
+	ChangeRumina            int32                 `json:"changeRumina"`
+	ChangeChew              int32                 `json:"changeChew"`
+	ChangeAdjust            int32                 `json:"changeAdjust"`
+	ChangeFilter            int32                 `json:"changeFilter"`
+	RuminaFilter            int32                 `json:"ruminaFilter"`
+	ChewFilter              int32                 `json:"chewFilter"`
+	FilterCorrect           int32                 `json:"filterCorrect"`
+	SumRumina               int32                 `json:"sumRumina"`
+	SumIntake               int32                 `json:"sumIntake"`
+	SumInactive             int32                 `json:"sumInactive"`
+	SumAct                  int32                 `json:"sumAct"`
+	SumMinHigh              int32                 `json:"sumMinHigh"`
+	SumMaxHigh              int32                 `json:"sumMaxHigh"`
+	SumMinChew              int32                 `json:"SumMinChew"`
+	SumRuminaBeforeThreeDay int32                 `json:"sumRuminaBeforeThreeDay"`
+	SumIntakeBeforeThreeDay int32                 `json:"sumIntakeBeforeThreeDay"`
+	Score                   int32                 `json:"score"`
+	IsMaxTime               pasturePb.IsShow_Kind `json:"isMaxTime"`
+	IsShow                  pasturePb.IsShow_Kind `json:"isShow"`
+	ReceiveNumber           int32                 `json:"receiveNumber"`
+	RecordCount             int32                 `json:"recordCount"`
+	CreatedAt               int64                 `json:"createdAt"`
+	UpdatedAt               int64                 `json:"updatedAt"`
+}

+ 3 - 2
util/util.go

@@ -369,7 +369,8 @@ func GetNeckRingActiveTimer(frameId int32) (dateTime string, hours int) {
 		return "", 0
 	}
 
-	nowTime := time.Now()
+	//nowTime := time.Now()
+	nowTime := time.Unix(1736440903, 0)
 	currHour := nowTime.Hour()
 	// 处理2小时的特殊 farmId
 	hours, ok := SpecialHours[int(frameId)]
@@ -387,7 +388,7 @@ func GetNeckRingActiveTimer(frameId int32) (dateTime string, hours int) {
 	hours += units / 3
 
 	if hours > currHour {
-		for i := 1; i <= 20; i++ {
+		for i := 0; i <= 20; i++ {
 			twentyHoursAgo := nowTime.Add(-time.Duration(i) * time.Hour).Hour()
 			if twentyHoursAgo == 0 {
 				twentyHoursAgo = 24

+ 34 - 0
util/util_test.go

@@ -506,6 +506,40 @@ func TestGetNeckRingActiveTimer(t *testing.T) {
 	}
 }
 
+type XToday struct {
+	XBegDate       string
+	XEndDate       string
+	LastMaxHabitId int64
+	CurrMaxHabitId int64
+	XMin2Id        int64
+	XMin7Id        int64
+	ActiveLowest   int64
+	RuminaLowest   int64
+	ActiveLow      int64
+	ActiveMiddle   int64
+	ActiveHigh     int64
+}
+
 func Test_demo(t *testing.T) {
+	xToday := &XToday{
+		XBegDate: "2025-01-08",
+		XEndDate: "2025-01-10",
+	}
 
+	currDate, _ := time.Parse("2006-01-02", xToday.XBegDate)
+	XEndDateTime, _ := time.Parse("2006-01-02", xToday.XEndDate)
+	xframeId := int64(0)
+	maxXframeId := int64(11)
+	dayTimes := int64(1)
+	for currDate.Format("2006-01-02") < XEndDateTime.Format("2006-01-02") || (currDate == XEndDateTime && xframeId <= maxXframeId) {
+		fmt.Println(currDate.Format("2006-01-02"), xframeId, dayTimes)
+		if xframeId == maxXframeId {
+			xframeId = 0
+			currDate = currDate.AddDate(0, 0, 1)
+			dayTimes = 1
+		} else {
+			xframeId++
+			dayTimes++
+		}
+	}
 }