Selaa lähdekoodia

neckRing: update

Yi 2 kuukautta sitten
vanhempi
commit
0b04d7bc74

+ 1 - 1
.drone.yml

@@ -30,7 +30,7 @@ steps:
         from_secret: aliyuncs_password
       repo: registry.cn-hangzhou.aliyuncs.com/kpt-event/kpt-pasture
       registry: registry.cn-hangzhou.aliyuncs.com
-      tags: [ http-test,mqtt-test ]
+      tags: [ http.test,mqtt-test ]
 
 trigger:
   branch:

+ 11 - 7
config/app.develop.yaml

@@ -36,13 +36,16 @@ side_work_setting:
       default: 5
 cron:
   crontab_start_run: false
-  update_cow_info: "0 01 1 * * ?"
-  generate_work_order: "0 05 1 * * ?"
-  immunization_plan: "0 10 1 * * ?"
-  same_time_plan: "0 15 1 * * ?"
-  update_same_time: "0 20 1 * * ?"
-  system_basic_crontab: "0 25 1 * * ?"
-  cow_pregnant: "0 00 15 * * ?"
+  update_cow_info: "0 01 1 * * ?"      # 每天凌晨1点01分执行
+  generate_work_order: "0 05 1 * * ?"  # 每天凌晨1点05分执行
+  immunization_plan: "0 10 1 * * ?"    # 每天凌晨1点10分执行
+  same_time_plan: "0 15 1 * * ?"      # 每天凌晨1点15分执行
+  update_same_time: "0 20 1 * * ?"    # 每天凌晨1点20分执行
+  system_basic_crontab: "0 25 1 * * ?"  # 每天凌晨1点25分执行
+  cow_pregnant: "0 00 15 * * ?"         # 每天15点执行
+  update_active_habit: "0 */2 * * * ?"  # 每2分钟执行一次
+  neck_ring_estrus: "0 */5 * * * ?"     # 每5分钟执行一次
+
 mqtt:
   broker: "kptyun.com:1983"
   username: "kptmqtt"
@@ -55,3 +58,4 @@ mqtt:
   auto_reconnect: true
   reconnect_interval: 10
   work_number: 1
+  merge_data_ticker: 2   # 2分钟合并一次数据

+ 3 - 0
config/app.go

@@ -57,6 +57,8 @@ type CronSetting struct {
 	UpdateSameTime     string `yaml:"update_same_time"`     //  更新同期
 	SystemBasicCrontab string `yaml:"system_basic_crontab"` //  系统基础定时任务
 	CowPregnant        string `yaml:"cow_pregnant"`         //  月度牛只怀孕清单
+	UpdateActiveHabit  string `yaml:"update_active_habit"`  //  脖环2小时数据重新整合
+	NeckRingEstrus     string `yaml:"neck_ring_estrus"`     //  脖环牛只发情
 }
 
 type JwtTokenKeyConfig struct {
@@ -168,6 +170,7 @@ type MqttSetting struct {
 	AutoReconnect     bool   `json:"autoReconnect" yaml:"auto_reconnect"`
 	ReconnectInterval int    `json:"reconnectInterval" yaml:"reconnect_interval"`
 	WorkNumber        int    `json:"workNumber" yaml:"work_number"`
+	MergeDataTicker   int    `json:"mergeDataTicker" yaml:"merge_data_ticker"`
 }
 
 func (a *AppConfig) Name() string {

+ 4 - 1
config/app.test.yaml

@@ -31,6 +31,8 @@ cron:
   update_same_time: "0 20 1 * * ?"
   system_basic_crontab: "0 25 1 * * ?"
   cow_pregnant: "0 00 15 * * ?"
+  update_active_habit: "0 */2 * * * ?"  # 每2分钟执行一次
+  neck_ring_estrus: "0 */5 * * * ?"     # 每5分钟执行一次
 
 mqtt:
   broker: "kptyun.com:1983"
@@ -43,4 +45,5 @@ mqtt:
   connect_timeout: 10
   auto_reconnect: true
   reconnect_interval: 10
-  work_number: 1
+  work_number: 1
+  merge_data_ticker: 2   # 2分钟合并一次数据

+ 15 - 5
dep/di_crontab.go

@@ -72,10 +72,20 @@ func EntryCrontab(dependency CrontabDependency) *cron.Crontab {
 		panic(err)
 	}
 
-	/*	err = newCrontab.Bind("CowPregnant", cs.CowPregnant, dependency.CrontabHub.CowPregnant)
-		if err != nil {
-			panic(err)
-		}
-	*/
+	/*err = newCrontab.Bind("CowPregnant", cs.CowPregnant, dependency.CrontabHub.CowPregnant)
+	if err != nil {
+		panic(err)
+	}*/
+
+	err = newCrontab.Bind("NeckRingEstrus", cs.NeckRingEstrus, dependency.CrontabHub.EntryCowEstrus)
+	if err != nil {
+		panic(err)
+	}
+
+	err = newCrontab.Bind("UpdateActiveHabit", cs.UpdateActiveHabit, dependency.CrontabHub.EntryUpdateActiveHabit)
+	if err != nil {
+		panic(err)
+	}
+
 	return newCrontab
 }

+ 2 - 2
model/neck_ring_original.go

@@ -100,10 +100,10 @@ func (n NeckRingOriginalMap) ForMatData() []*NeckActiveHabit {
 			continue
 		}
 		neckRingNumber := keyStrList[0]
-		activeDate := keyStrList[1]
+		heatDate := keyStrList[1]
 		frameId := keyStrList[2]
 		frameIdInt, _ := strconv.Atoi(frameId)
-		res = append(res, NewNeckActiveHabit(int32(frameIdInt), activeDate, neckRingNumber, v))
+		res = append(res, NewNeckActiveHabit(int32(frameIdInt), heatDate, neckRingNumber, v))
 	}
 	return res
 }

+ 24 - 0
model/neck_ring_process.go

@@ -0,0 +1,24 @@
+package model
+
+type NeckRingProcess struct {
+	Id             int64  `json:"id"`
+	HabitId        int64  `json:"habitId"`
+	NeckRingNumber string `json:"neckRingNumber"`
+	ActiveDate     string `json:"activeDate"`
+	Frameid        int32  `json:"frameid"`
+	CreatedAt      int64  `json:"createdAt"`
+	UpdatedAt      int64  `json:"updatedAt"`
+}
+
+func (n *NeckRingProcess) TableName() string {
+	return "neck_ring_process"
+}
+
+func NewNeckRingProcess(neckActiveHabit *NeckActiveHabit) *NeckRingProcess {
+	return &NeckRingProcess{
+		HabitId:        neckActiveHabit.Id,
+		NeckRingNumber: neckActiveHabit.NeckRingNumber,
+		ActiveDate:     neckActiveHabit.HeatDate,
+		Frameid:        neckActiveHabit.Frameid,
+	}
+}

+ 0 - 36
model/neck_ring_unregist.go

@@ -1,36 +0,0 @@
-package model
-
-import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
-
-type NeckRingUnRegister struct {
-	Id              int64                         `json:"id"`
-	PastureId       int64                         `json:"pastureId"`
-	Uuid            string                        `json:"uuid"`
-	NeckRingNumber  string                        `json:"neckRingNumber"`  // 脖环号 (对应老表字段EID1)
-	ActiveDate      string                        `json:"activeDate"`      // 采集时间-天(YYYY-MM-DD对应老表字段heatdate)
-	Hours           int32                         `json:"hours"`           // 采集时间-小时(hours)
-	Frameid         int32                         `json:"frameid"`         // 采集时长(对应老表frameid)
-	Rumina          int32                         `json:"rumina"`          // 反刍时长(rumaina)
-	Intake          int32                         `json:"intake"`          // 采食时长(intake)
-	Inactive        int32                         `json:"inactive"`        // 静止时间(inactive)
-	Gasp            int32                         `json:"gasp"`            // 喘息时长(Other)
-	High            int32                         `json:"high"`            // 活动量(activitys)
-	Active          int32                         `json:"active"`          // 运动时长(High)
-	Other           int32                         `json:"other"`           // 其他时长
-	FirmwareVersion int32                         `json:"firmwareVersion"` // 固件版本(对应老表Version)
-	HardwareVersion int32                         `json:"hardwareVersion"` // 硬件版本
-	Remain          int32                         `json:"remain"`          // 脖环剩余数据量,57之后为上一次上报结果
-	Voltage         int32                         `json:"voltage"`         // 电池电压
-	RestartReason   int32                         `json:"restartReason"`   // 脖环重启原因 (对应老表HIB)
-	Upper           int32                         `json:"upper"`           // 脖环正向比例发射功率
-	ActiveDateType  pasturePb.ActiveTimeType_Kind `json:"ActiveDateTimeType"`
-	IsShow          pasturePb.IsShow_Kind         `json:"isShow"`
-	Imei            string                        `json:"imei"`          // 4G模组IMEI(imei)
-	ReceiveNumber   string                        `json:"receiveNumber"` // 接收器编号
-	CreatedAt       int64                         `json:"createdAt"`
-	UpdatedAt       int64                         `json:"updatedAt"`
-}
-
-func (s *NeckRingUnRegister) TableName() string {
-	return "neck_ring_unregister"
-}

+ 3 - 0
module/crontab/interface.go

@@ -31,4 +31,7 @@ type Crontab interface {
 	SameTimePlan() error
 	UpdateSameTime() error
 	SystemBasicCrontab() error
+
+	EntryUpdateActiveHabit() error // 更新脖环数据 2分钟执行一下
+	EntryCowEstrus() error         // 获取牛只发情数据 5分钟执行一下
 }

+ 17 - 12
module/crontab/estrus_warning.go → module/crontab/neck_ring_estrus.go

@@ -1,7 +1,6 @@
 package crontab
 
 import (
-	"context"
 	"kpt-pasture/model"
 	"time"
 
@@ -11,8 +10,6 @@ import (
 	"go.uber.org/zap"
 
 	"gitee.com/xuyiping_admin/pkg/xerr"
-
-	"github.com/hibiken/asynq"
 )
 
 const (
@@ -25,23 +22,31 @@ const (
 	NormalChangJust = 10
 )
 
-func (e *Entry) EntryCowEstrus(ctx context.Context, t *asynq.Task) error {
+func (e *Entry) EntryCowEstrus() (err error) {
 	activeLowValue := e.GetSystemConfigure(model.ActiveLow).Value
 	activeMiddleValue := e.GetSystemConfigure(model.ActiveMiddle).Value
 	activeHighValue := e.GetSystemConfigure(model.ActiveHigh).Value
 	lastMaxEstrusId := e.GetSystemConfigure(model.MaxEstrus).Value
 
 	currentMaxHabit := &model.NeckActiveHabit{}
-	if err := e.DB.Model(new(model.NeckActiveHabit)).
+	if err = e.DB.Model(new(model.NeckActiveHabit)).
 		Order("id desc").
 		First(currentMaxHabit).Error; err != nil {
 		return xerr.WithStack(err)
 	}
 
+	defer func() {
+		if err == nil {
+			e.DB.Model(new(model.SystemConfigure)).
+				Where("name = ?", model.MaxEstrus).
+				Update("value", currentMaxHabit.Id)
+		}
+	}()
+
 	xToday := &XToday{}
-	if err := e.DB.Model(new(model.NeckActiveHabit)).
+	if err = e.DB.Model(new(model.NeckActiveHabit)).
 		Select(`MIN(h.heat_date) as x_beg_date, MAX(h.heat_date) as x_end_date`).
-		Where("id BETWEEN ? AND ?", lastMaxEstrusId, currentMaxHabit).
+		Where("id BETWEEN ? AND ?", lastMaxEstrusId, currentMaxHabit.Id).
 		First(xToday).Error; err != nil {
 		return xerr.WithStack(err)
 	}
@@ -57,13 +62,14 @@ func (e *Entry) EntryCowEstrus(ctx context.Context, t *asynq.Task) error {
 	xToday.ActiveMiddle = int64(activeMiddleValue)
 	xToday.ActiveHigh = int64(activeHighValue)
 
-	if err := e.CowEstrusWarning(ctx, xToday); err != nil {
+	if err = e.CowEstrusWarning(xToday); err != nil {
 		return xerr.WithStack(err)
 	}
+
 	return nil
 }
 
-func (e *Entry) CowEstrusWarning(ctx context.Context, xToday *XToday) error {
+func (e *Entry) CowEstrusWarning(xToday *XToday) error {
 	startDate, err := time.Parse(model.LayoutDate2, xToday.XBegDate)
 	if err != nil {
 		return xerr.WithStack(err)
@@ -89,7 +95,7 @@ func (e *Entry) CowEstrusWarning(ctx context.Context, xToday *XToday) error {
 
 		eventEstrusList := make([]*model.EventEstrus, 0)
 		for _, v := range neckActiveHabitList {
-			if ok := e.IsAdJustLow(ctx, xToday, v); ok {
+			if ok := e.IsAdJustLow(xToday, v); ok {
 				continue
 			}
 			cft := float32(0)
@@ -181,12 +187,11 @@ func (e *Entry) CowEstrusWarning(ctx context.Context, xToday *XToday) error {
 		}
 		startDate.AddDate(0, 0, 1)
 	}
-
 	return nil
 }
 
 // IsAdJustLow 是否低于最低活动量
-func (e *Entry) IsAdJustLow(ctx context.Context, xToday *XToday, habit *model.NeckActiveHabit) bool {
+func (e *Entry) IsAdJustLow(xToday *XToday, habit *model.NeckActiveHabit) bool {
 	ruminaAdJust := float64(0)
 	switch {
 	case habit.RuminaFilter > MaxRuminaAdJust:

+ 13 - 11
module/crontab/neck_ring.go → module/crontab/neck_ring_habit.go

@@ -18,10 +18,11 @@ const (
 	DefaultNb       = 30
 )
 
-func (e *Entry) ActiveHabit() error {
+func (e *Entry) EntryUpdateActiveHabit() (err error) {
 	lastMaxHabitId := e.GetSystemConfigure(model.MaxHabit).Value
 	currentMaxHabit := &model.NeckActiveHabit{}
-	if err := e.DB.Model(new(model.NeckActiveHabit)).
+	if err = e.DB.Model(new(model.NeckActiveHabit)).
+		Where("id > ?", lastMaxHabitId).
 		Order("id desc").First(currentMaxHabit).Error; err != nil {
 		return xerr.WithStack(err)
 	}
@@ -32,7 +33,7 @@ func (e *Entry) ActiveHabit() error {
 	}
 
 	// 统一更新is_max_time为0
-	if err := e.DB.Model(new(model.NeckActiveHabit)).
+	if err = e.DB.Model(new(model.NeckActiveHabit)).
 		Where("is_max_time = ?", pasturePb.IsShow_Ok).
 		Update("is_max_time", pasturePb.IsShow_No).Error; err != nil {
 		return xerr.WithStack(err)
@@ -40,7 +41,7 @@ func (e *Entry) ActiveHabit() error {
 
 	// 获取这段执行数据内最大日期和最小日期
 	xToday := &XToday{}
-	if err := e.DB.Model(new(model.NeckActiveHabit)).
+	if err = e.DB.Model(new(model.NeckActiveHabit)).
 		Select(`MIN(h.heat_date) as x_beg_date, MAX(h.heat_date) as x_end_date`).
 		Where("id BETWEEN ? AND ?", lastMaxHabitId, currentMaxHabit).
 		First(xToday).Error; err != nil {
@@ -65,6 +66,14 @@ func (e *Entry) ActiveHabit() error {
 	if err != nil {
 		return xerr.WithStack(err)
 	}
+	defer func() {
+		// 更新最后一次执行的id值
+		if err == nil {
+			e.DB.Model(new(model.SystemConfigure)).
+				Where("name = ?", model.MaxHabit).
+				Update("value", currentMaxHabit.Id)
+		}
+	}()
 	xToday.XMin2Id = xMin2Id
 	xToday.XMin7Id = xMin7Id
 
@@ -96,13 +105,6 @@ func (e *Entry) ActiveHabit() error {
 		return xerr.WithStack(err)
 	}
 
-	// 更新最后一次执行的id值
-	if err = e.DB.Model(new(model.SystemConfigure)).
-		Where("name = ?", model.MaxHabit).
-		Update("value = ?", xToday.CurrMaxHabitId+1).
-		Error; err != nil {
-		return xerr.WithStack(err)
-	}
 	return nil
 }
 

+ 28 - 11
module/mqtt/handle.go

@@ -22,10 +22,9 @@ import (
 )
 
 type DataInsertNeckRingLog struct {
-	NeckRingOriginalData   []*model.NeckRingOriginal
-	NeckRingErrorData      []*model.NeckRingError
-	NeckRingUnRegisterData []*model.NeckRingUnRegister
-	Mx                     *sync.RWMutex
+	NeckRingOriginalData []*model.NeckRingOriginal
+	NeckRingErrorData    []*model.NeckRingError
+	Mx                   *sync.RWMutex
 }
 
 var (
@@ -33,11 +32,11 @@ var (
 	batchList    = make([]*model.NeckRingOriginal, 0, batchSize)
 	defaultLimit = int32(100)
 	DSMLog       = &DataInsertNeckRingLog{
-		NeckRingOriginalData:   make([]*model.NeckRingOriginal, 0),
-		NeckRingErrorData:      make([]*model.NeckRingError, 0),
-		NeckRingUnRegisterData: make([]*model.NeckRingUnRegister, 0),
-		Mx:                     &sync.RWMutex{},
+		NeckRingOriginalData: make([]*model.NeckRingOriginal, 0),
+		NeckRingErrorData:    make([]*model.NeckRingError, 0),
+		Mx:                   &sync.RWMutex{},
 	}
+	isDelete bool
 )
 
 func (e *Entry) NeckRingHandle(data []byte) {
@@ -330,7 +329,7 @@ func (e *Entry) NeckRingOriginalMergeData() {
 		limit = defaultLimit
 	}
 
-	updateOriginalMaxId := e.GetSystemConfigure(model.MaxEstrus).Value
+	updateOriginalMaxId := e.GetSystemConfigure(model.UpdateOriginalMaxId).Value
 	neckRingList := make([]*model.NeckRingOriginal, 0)
 	if err := e.DB.Model(new(model.NeckRingOriginal)).
 		Where("id > ?", updateOriginalMaxId).
@@ -343,6 +342,20 @@ func (e *Entry) NeckRingOriginalMergeData() {
 		return
 	}
 
+	defer func() {
+		currTime := time.Now()
+		// 删除15天前的数据
+		if currTime.Day()%15 == 0 && !isDelete {
+			e.DB.Model(new(model.NeckRingOriginal)).
+				Where("active_date < ?", currTime.AddDate(0, 0, -15).Format(model.LayoutDate2)).
+				Delete(new(model.NeckRingOriginal))
+			e.DB.Model(new(model.NeckRingProcess)).
+				Where("active_date < ?", currTime.AddDate(0, 0, -5).Format(model.LayoutDate2)).
+				Delete(new(model.NeckRingProcess))
+			isDelete = true
+		}
+	}()
+
 	originalMapData := make(map[string]*model.NeckRingOriginalMerge)
 	// 合并成2个小时的
 	for _, v := range neckRingList {
@@ -360,24 +373,28 @@ func (e *Entry) NeckRingOriginalMergeData() {
 		v.SumAvg()
 	}
 
+	zaplog.Info("NeckRingOriginalMergeData", zap.Any("originalMapData", originalMapData))
 	// 更新脖环牛只相关信息
 	newNeckActiveHabitList := model.NeckRingOriginalMap(originalMapData).ForMatData()
 	if err := e.DB.Transaction(func(tx *gorm.DB) error {
 		// 更新已处理过的id
+		processMaxId := neckRingList[len(neckRingList)-1].Id
+		fmt.Println("updateOriginalMaxId", processMaxId)
 		if err := tx.Model(new(model.SystemConfigure)).
 			Where("name = ?", model.UpdateOriginalMaxId).
-			Update("value", neckRingList[len(neckRingList)-1].Id).
+			Update("value", processMaxId).
 			Error; err != nil {
 			return xerr.WithStack(err)
 		}
 
 		for _, neckActiveHabit := range newNeckActiveHabitList {
-			// 新数据直接插入 todo 待优化
+			// 新数据直接插入
 			historyNeckActiveHabit, ct := e.IsExistNeckActiveHabit(neckActiveHabit.NeckRingNumber, neckActiveHabit.HeatDate, neckActiveHabit.Frameid)
 			if ct <= 0 {
 				if err := tx.Create(neckActiveHabit).Error; err != nil {
 					return xerr.WithStack(err)
 				}
+				tx.Create(model.NewNeckRingProcess(neckActiveHabit))
 				continue
 			}
 

+ 9 - 5
module/mqtt/sql.go

@@ -41,15 +41,19 @@ func (e *Entry) GetSystemConfigure(name string) *model.SystemConfigure {
 
 func (e *Entry) IsExistNeckActiveHabit(neckRingNumber, heatDate string, frameId int32) (*model.NeckActiveHabit, int64) {
 	count := int64(0)
-	res := &model.NeckActiveHabit{}
-	if err := e.DB.Model(new(model.NeckActiveHabit)).
+	neckRingProcess := &model.NeckRingProcess{}
+	if err := e.DB.Model(new(model.NeckRingProcess)).
 		Where("neck_ring_number = ?", neckRingNumber).
 		Where("heat_date = ?", heatDate).
 		Where("frameid = ?", frameId).
-		Count(&count).
-		First(res).
-		Error; err != nil {
+		Count(&count).Error; err != nil {
 		return nil, 0
 	}
+	res := &model.NeckActiveHabit{}
+	if neckRingProcess != nil && neckRingProcess.HabitId > 0 {
+		if err := e.DB.Model(new(model.NeckActiveHabit)).Where("id = ?", neckRingProcess.HabitId).First(res).Error; err != nil {
+			return nil, 0
+		}
+	}
 	return res, count
 }

+ 2 - 2
service/mqtt/interface.go

@@ -112,8 +112,8 @@ func (s *IMqttClient) Run(enter handleMqtt.Entry) {
 	// 设置信号监听以优雅关闭服务器
 	stop := make(chan os.Signal, 1)
 	signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-	// 每隔30分钟
-	ticker := time.NewTicker(10 * time.Minute)
+
+	ticker := time.NewTicker(time.Duration(s.Config.MergeDataTicker) * time.Minute)
 	defer ticker.Stop()
 
 	// 创建上下文,用于优雅关闭

+ 4 - 0
util/util.go

@@ -385,9 +385,13 @@ func GetNeckRingActiveTimer(frameId int32) (dateTime string, hours int) {
 	hours = int(math.Floor(float64(frameId)/10) * 2)
 	units := int(frameId % 10)
 	hours += units / 3
+
 	if hours > currHour {
 		for i := 1; i <= 20; i++ {
 			twentyHoursAgo := nowTime.Add(-time.Duration(i) * time.Hour).Hour()
+			if twentyHoursAgo == 0 {
+				twentyHoursAgo = 24
+			}
 			if hours == twentyHoursAgo {
 				day = -1
 				break

+ 0 - 17
util/util_test.go

@@ -2,7 +2,6 @@ package util
 
 import (
 	"fmt"
-	"math"
 	"testing"
 	"time"
 
@@ -508,21 +507,5 @@ func TestGetNeckRingActiveTimer(t *testing.T) {
 }
 
 func Test_demo(t *testing.T) {
-	frameId := 76
-	day := 0
-	nowTime := time.Now()
-	hours := int(math.Floor(float64(frameId)/10) * 2)
-	fmt.Println(hours)
-	units := frameId % 10
-	hours += units / 3
-	fmt.Println(hours)
-	currHours := 13
-	if hours > currHours {
-		if nowTime.Add(-20*time.Hour).Hour() == hours {
-			day = -1
-		}
-	}
-	dateTime := nowTime.AddDate(0, 0, day).Format(Layout)
-	fmt.Println(hours, dateTime, units)
 
 }