Browse Source

mqtt: 调整mqtt合并数据业务逻辑

Yi 2 months ago
parent
commit
1e0ff48093

+ 1 - 2
config/app.develop.yaml

@@ -43,12 +43,11 @@ cron:
   update_same_time: "0 20 1 * * ?"
   system_basic_crontab: "0 25 1 * * ?"
   cow_pregnant: "0 00 15 * * ?"
-  neck_ring: "*/30 * * * * ?"
 mqtt:
   broker: "kptyun.com:1983"
   username: "kptmqtt"
   password: "kepaiteng"
-  sub_topic: "kptmqtt"
+  sub_topic: "/user/heatwatch/neckring/post"
   qos: 0
   retain: false
   keep_alive: 60

+ 0 - 1
config/app.go

@@ -57,7 +57,6 @@ type CronSetting struct {
 	UpdateSameTime     string `yaml:"update_same_time"`     //  更新同期
 	SystemBasicCrontab string `yaml:"system_basic_crontab"` //  系统基础定时任务
 	CowPregnant        string `yaml:"cow_pregnant"`         //  月度牛只怀孕清单
-	NeckRing           string `yaml:"neck_ring"`            //  脖环数据更新
 }
 
 type JwtTokenKeyConfig struct {

+ 11 - 1
config/app.test.yaml

@@ -22,11 +22,21 @@ redis_setting:
 jwt_secret: "sUd7j%UfJMt59ywh"
 cache_key_suffix: "gmym"
 
+cron:
+  crontab_start_run: false
+  update_cow_info: "0 01 1 * * ?"
+  generate_work_order: "0 05 1 * * ?"
+  immunization_plan: "0 10 1 * * ?"
+  same_time_plan: "0 15 1 * * ?"
+  update_same_time: "0 20 1 * * ?"
+  system_basic_crontab: "0 25 1 * * ?"
+  cow_pregnant: "0 00 15 * * ?"
+
 mqtt:
   broker: "kptyun.com:1983"
   username: "kptmqtt"
   password: "kepaiteng"
-  sub_topic: "kptmqtt"
+  sub_topic: "/user/heatwatch/neckring/post"
   qos: 0
   retain: false
   keep_alive: 60

+ 0 - 4
dep/di_crontab.go

@@ -77,9 +77,5 @@ func EntryCrontab(dependency CrontabDependency) *cron.Crontab {
 			panic(err)
 		}
 	*/
-	err = newCrontab.Bind("NeckRing", cs.NeckRing, dependency.CrontabHub.NeckRingOriginalMergeData)
-	if err != nil {
-		panic(err)
-	}
 	return newCrontab
 }

+ 28 - 0
model/data_indicators.go

@@ -0,0 +1,28 @@
+package model
+
+type DataIndicators struct {
+	Id            int64  `json:"id"`
+	PastureId     int64  `json:"pastureId"`
+	DateTime      string `json:"dateTime"`
+	IndicatorKind string `json:"indicatorKind"`
+	Name          string `json:"name"`
+	Value         string `json:"value"`
+	Remarks       string `json:"remarks"`
+	CreatedAt     int64  `json:"createdAt"`
+	UpdatedAt     int64  `json:"updatedAt"`
+}
+
+func (d *DataIndicators) TableName() string {
+	return "data_indicators"
+}
+
+func NewDataIndicators(pastureId int64, dataTime, value, remarks, indicatorKind, name string) *DataIndicators {
+	return &DataIndicators{
+		PastureId:     pastureId,
+		DateTime:      dataTime,
+		IndicatorKind: indicatorKind,
+		Name:          name,
+		Value:         value,
+		Remarks:       remarks,
+	}
+}

+ 4 - 20
model/neck_active_habit.go

@@ -11,14 +11,14 @@ const (
 	DefaultRuminaFilter  = -10000
 	DefaultChewFilter    = -10000
 	DefaultFilterCorrect = 100
+	DefaultWeeklyActive  = 105
 )
 
 type NeckActiveHabit struct {
 	Id                      int64                 `json:"id"`
 	CowId                   int64                 `json:"cowId"`
 	NeckRingNumber          string                `json:"neckRingNumber"`
-	Lact                    int32                 `json:"lact"`
-	CalvingAge              int64                 `json:"calvingAge"`
+	ActiveTime              string                `json:"activeTime"`
 	Frameid                 int32                 `json:"frameid"`
 	HeatDate                string                `json:"heatDate"`
 	Rumina                  int32                 `json:"rumina"`
@@ -28,8 +28,6 @@ type NeckActiveHabit struct {
 	Other                   int32                 `json:"other"`
 	High                    int32                 `json:"high"`
 	Active                  int32                 `json:"active"`
-	Voltage                 int32                 `json:"voltage"`
-	Version                 int32                 `json:"version"`
 	FilterHigh              int32                 `json:"filterHigh"`
 	FilterRumina            int32                 `json:"filterRumina"`
 	FilterChew              int32                 `json:"filterChew"`
@@ -62,7 +60,6 @@ type NeckActiveHabit struct {
 	IsShow                  pasturePb.IsShow_Kind `json:"isShow"`
 	ReceiveNumber           int32                 `json:"receiveNumber"`
 	RecordCount             int32                 `json:"recordCount"`
-	ActiveTime              string                `json:"activeTime"`
 	CreatedAt               int64                 `json:"createdAt"`
 	UpdatedAt               int64                 `json:"updatedAt"`
 }
@@ -71,24 +68,11 @@ func (n *NeckActiveHabit) TableName() string {
 	return "neck_active_habit"
 }
 
-func NewNeckActiveHabit(defaultWeeklyActive, frameId int32, heatDate, neckRingNumber string, cow *Cow, data *NeckRingOriginalMerge) *NeckActiveHabit {
-	cowId := int64(0)
-	lact := int32(0)
-	if cow != nil {
-		cowId = cow.Id
-		lact = cow.Lact
-	}
-	weekHigh := cow.WeeklyActive
-	if cow.WeeklyActive == 0 {
-		weekHigh = defaultWeeklyActive
-	}
+func NewNeckActiveHabit(frameId int32, heatDate, neckRingNumber string, data *NeckRingOriginalMerge) *NeckActiveHabit {
 	return &NeckActiveHabit{
 		Frameid:        frameId,
 		HeatDate:       heatDate,
 		NeckRingNumber: neckRingNumber,
-		Lact:           lact,
-		CalvingAge:     cow.CalvingAge,
-		CowId:          cowId,
 		Active:         data.Active,
 		Gasp:           data.Gasp,
 		High:           data.High,
@@ -96,7 +80,7 @@ func NewNeckActiveHabit(defaultWeeklyActive, frameId int32, heatDate, neckRingNu
 		Intake:         data.Intake,
 		Other:          data.Other,
 		Rumina:         data.Rumina,
-		WeekHigh:       weekHigh,
+		WeekHigh:       DefaultWeeklyActive,
 		IsShow:         pasturePb.IsShow_No,
 		IsMaxTime:      pasturePb.IsShow_No,
 		ChangeFilter:   DefaultChangeFilter,

+ 26 - 24
model/neck_ring_error.go

@@ -3,30 +3,32 @@ package model
 import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 
 type NeckRingError struct {
-	Id             int64                 `json:"id"`
-	Uuid           string                `json:"uuid"`
-	FrameId        int32                 `json:"frameId"`
-	Low            int32                 `json:"low"`
-	High           int32                 `json:"high"`
-	Rumina         int32                 `json:"rumina"`
-	Active         int32                 `json:"active"`
-	Intake         int32                 `json:"intake"`
-	Inactive       int32                 `json:"inactive"`
-	Other          int32                 `json:"other"`
-	Voltage        int32                 `json:"voltage"`
-	Upper          int32                 `json:"upper"`
-	Version        int32                 `json:"version"`
-	Sign           int32                 `json:"sign"`
-	Remain         int32                 `json:"remain"`
-	Feed           int32                 `json:"feed"`
-	Imei           string                `json:"imei" `
-	Temp           int32                 `json:"temp"`
-	Gasp           int32                 `json:"gasp"`
-	ActiveDateTime string                `json:"activeDateTime"`
-	IsShow         pasturePb.IsShow_Kind `json:"isShow"`
-	ReceiveNumber  string                `json:"receiveNumber"`
-	CreatedAt      int64                 `json:"createdAt"`
-	UpdatedAt      int64                 `json:"updatedAt"`
+	Id              int64                         `json:"id"`
+	PastureId       int64                         `json:"pastureId"`
+	Uuid            string                        `json:"uuid"`
+	NeckRingNumber  string                        `json:"neckRingNumber"`  // 脖环号 (对应老表字段EID1)
+	ActiveDate      string                        `json:"activeDate"`      // 采集时间-天(YYYY-MM-DD对应老表字段heatdate)
+	Hours           int32                         `json:"hours"`           // 采集时间-小时(hours)
+	Frameid         int32                         `json:"frameid"`         // 采集时长(对应老表frameid)
+	Rumina          int32                         `json:"rumina"`          // 反刍时长(rumaina)
+	Intake          int32                         `json:"intake"`          // 采食时长(intake)
+	Inactive        int32                         `json:"inactive"`        // 静止时间(inactive)
+	Gasp            int32                         `json:"gasp"`            // 喘息时长(Other)
+	High            int32                         `json:"high"`            // 活动量(activitys)
+	Active          int32                         `json:"active"`          // 运动时长(High)
+	Other           int32                         `json:"other"`           // 其他时长
+	FirmwareVersion int32                         `json:"firmwareVersion"` // 固件版本(对应老表Version)
+	HardwareVersion int32                         `json:"hardwareVersion"` // 硬件版本
+	Remain          int32                         `json:"remain"`          // 脖环剩余数据量,57之后为上一次上报结果
+	Voltage         int32                         `json:"voltage"`         // 电池电压
+	RestartReason   int32                         `json:"restartReason"`   // 脖环重启原因 (对应老表HIB)
+	Upper           int32                         `json:"upper"`           // 脖环正向比例发射功率
+	ActiveDateType  pasturePb.ActiveTimeType_Kind `json:"ActiveDateTimeType"`
+	IsShow          pasturePb.IsShow_Kind         `json:"isShow"`
+	Imei            string                        `json:"imei"`          // 4G模组IMEI(imei)
+	ReceiveNumber   string                        `json:"receiveNumber"` // 接收器编号
+	CreatedAt       int64                         `json:"createdAt"`
+	UpdatedAt       int64                         `json:"updatedAt"`
 }
 
 func (s *NeckRingError) TableName() string {

+ 28 - 33
model/neck_ring_original.go

@@ -9,36 +9,32 @@ import (
 )
 
 type NeckRingOriginal struct {
-	Id                 int64                         `json:"id"`
-	Uuid               string                        `json:"uuid"`
-	FrameId            int32                         `json:"frameId"`
-	CowId              string                        `json:"cowId"`
-	Low                int32                         `json:"low"`
-	High               int32                         `json:"high"`
-	Rumina             int32                         `json:"rumina"`
-	Active             int32                         `json:"active"`
-	Intake             int32                         `json:"intake"`
-	Inactive           int32                         `json:"inactive"`
-	Other              int32                         `json:"other"`
-	Voltage            int32                         `json:"voltage"`
-	Upper              int32                         `json:"upper"`
-	Version            int32                         `json:"version"`
-	Csq                int32                         `json:"csq"`
-	Sign               int32                         `json:"sign"`
-	Remain             int32                         `json:"remain"`
-	Feed               int32                         `json:"feed"`
-	Imei               string                        `json:"imei"`
-	Nccid              string                        `json:"nccid"`
-	Temp               int32                         `json:"temp"`
-	Gasp               int32                         `json:"gasp"`
-	Hours              int32                         `json:"hours"`
-	ActiveDate         string                        `json:"activeDate"`
-	ActiveDateType     pasturePb.ActiveTimeType_Kind `json:"ActiveDateTimeType"`
-	IsShow             pasturePb.IsShow_Kind         `json:"isShow"`
-	ReceiveNumber      string                        `json:"receiveNumber"`
-	ShortReceiveNumber string                        `json:"shortReceiveNumber"`
-	CreatedAt          int64                         `json:"createdAt"`
-	UpdatedAt          int64                         `json:"updatedAt"`
+	Id              int64                         `json:"id"`
+	PastureId       int64                         `json:"pastureId"`
+	Uuid            string                        `json:"uuid"`
+	NeckRingNumber  string                        `json:"neckRingNumber"`  // 脖环号 (对应老表字段EID1)
+	ActiveDate      string                        `json:"activeDate"`      // 采集时间-天(YYYY-MM-DD对应老表字段heatdate)
+	Hours           int32                         `json:"hours"`           // 采集时间-小时(hours)
+	Frameid         int32                         `json:"frameid"`         // 采集时长(对应老表frameid)
+	Rumina          int32                         `json:"rumina"`          // 反刍时长(rumaina)
+	Intake          int32                         `json:"intake"`          // 采食时长(intake)
+	Inactive        int32                         `json:"inactive"`        // 静止时间(inactive)
+	Gasp            int32                         `json:"gasp"`            // 喘息时长(Other)
+	High            int32                         `json:"high"`            // 活动量(activitys)
+	Active          int32                         `json:"active"`          // 运动时长(High)
+	Other           int32                         `json:"other"`           // 其他时长
+	FirmwareVersion int32                         `json:"firmwareVersion"` // 固件版本(对应老表Version)
+	HardwareVersion int32                         `json:"hardwareVersion"` // 硬件版本
+	Remain          int32                         `json:"remain"`          // 脖环剩余数据量,57之后为上一次上报结果
+	Voltage         int32                         `json:"voltage"`         // 电池电压
+	RestartReason   int32                         `json:"restartReason"`   // 脖环重启原因 (对应老表HIB)
+	Upper           int32                         `json:"upper"`           // 脖环正向比例发射功率
+	ActiveDateType  pasturePb.ActiveTimeType_Kind `json:"ActiveDateTimeType"`
+	IsShow          pasturePb.IsShow_Kind         `json:"isShow"`
+	Imei            string                        `json:"imei"`          // 4G模组IMEI(imei)
+	ReceiveNumber   string                        `json:"receiveNumber"` // 接收器编号
+	CreatedAt       int64                         `json:"createdAt"`
+	UpdatedAt       int64                         `json:"updatedAt"`
 }
 
 func (n *NeckRingOriginal) TableName() string {
@@ -96,7 +92,7 @@ func (n *NeckRingOriginalMerge) SumAvg() {
 
 type NeckRingOriginalMap map[string]*NeckRingOriginalMerge
 
-func (n NeckRingOriginalMap) ForMatData(defaultWeeklyActive int32, getCowInfo func(string) *Cow) []*NeckActiveHabit {
+func (n NeckRingOriginalMap) ForMatData() []*NeckActiveHabit {
 	res := make([]*NeckActiveHabit, 0)
 	for key, v := range n {
 		keyStrList := strings.Split(key, JoinKey)
@@ -107,8 +103,7 @@ func (n NeckRingOriginalMap) ForMatData(defaultWeeklyActive int32, getCowInfo fu
 		activeDate := keyStrList[1]
 		frameId := keyStrList[2]
 		frameIdInt, _ := strconv.Atoi(frameId)
-		cowInfo := getCowInfo(imei)
-		res = append(res, NewNeckActiveHabit(defaultWeeklyActive, int32(frameIdInt), activeDate, imei, cowInfo, v))
+		res = append(res, NewNeckActiveHabit(int32(frameIdInt), activeDate, imei, v))
 	}
 	return res
 }

+ 26 - 22
model/neck_ring_unregist.go

@@ -3,28 +3,32 @@ package model
 import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 
 type NeckRingUnRegister struct {
-	Id             int64                 `json:"id"`
-	Uuid           string                `json:"uuid"`
-	FrameId        int32                 `json:"frameId"`
-	Low            int32                 `json:"low"`
-	High           int32                 `json:"high"`
-	Rumina         int32                 `json:"rumina"`
-	Active         int32                 `json:"active"`
-	Intake         int32                 `json:"intake"`
-	Inactive       int32                 `json:"inactive"`
-	Other          int32                 `json:"other"`
-	Voltage        int32                 `json:"voltage"`
-	Upper          int32                 `json:"upper"`
-	Version        int32                 `json:"version"`
-	Sign           int32                 `json:"sign"`
-	Remain         int32                 `json:"remain"`
-	Feed           int32                 `json:"feed"`
-	Imei           string                `json:"imei" `
-	Temp           int32                 `json:"temp"`
-	ActiveDateTime string                `json:"activeDateTime"`
-	IsShow         pasturePb.IsShow_Kind `json:"isShow"`
-	CreatedAt      int64                 `json:"createdAt"`
-	UpdatedAt      int64                 `json:"updatedAt"`
+	Id              int64                         `json:"id"`
+	PastureId       int64                         `json:"pastureId"`
+	Uuid            string                        `json:"uuid"`
+	NeckRingNumber  string                        `json:"neckRingNumber"`  // 脖环号 (对应老表字段EID1)
+	ActiveDate      string                        `json:"activeDate"`      // 采集时间-天(YYYY-MM-DD对应老表字段heatdate)
+	Hours           int32                         `json:"hours"`           // 采集时间-小时(hours)
+	Frameid         int32                         `json:"frameid"`         // 采集时长(对应老表frameid)
+	Rumina          int32                         `json:"rumina"`          // 反刍时长(rumaina)
+	Intake          int32                         `json:"intake"`          // 采食时长(intake)
+	Inactive        int32                         `json:"inactive"`        // 静止时间(inactive)
+	Gasp            int32                         `json:"gasp"`            // 喘息时长(Other)
+	High            int32                         `json:"high"`            // 活动量(activitys)
+	Active          int32                         `json:"active"`          // 运动时长(High)
+	Other           int32                         `json:"other"`           // 其他时长
+	FirmwareVersion int32                         `json:"firmwareVersion"` // 固件版本(对应老表Version)
+	HardwareVersion int32                         `json:"hardwareVersion"` // 硬件版本
+	Remain          int32                         `json:"remain"`          // 脖环剩余数据量,57之后为上一次上报结果
+	Voltage         int32                         `json:"voltage"`         // 电池电压
+	RestartReason   int32                         `json:"restartReason"`   // 脖环重启原因 (对应老表HIB)
+	Upper           int32                         `json:"upper"`           // 脖环正向比例发射功率
+	ActiveDateType  pasturePb.ActiveTimeType_Kind `json:"ActiveDateTimeType"`
+	IsShow          pasturePb.IsShow_Kind         `json:"isShow"`
+	Imei            string                        `json:"imei"`          // 4G模组IMEI(imei)
+	ReceiveNumber   string                        `json:"receiveNumber"` // 接收器编号
+	CreatedAt       int64                         `json:"createdAt"`
+	UpdatedAt       int64                         `json:"updatedAt"`
 }
 
 func (s *NeckRingUnRegister) TableName() string {

+ 11 - 10
model/system_configure.go

@@ -3,16 +3,17 @@ package model
 import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 
 const (
-	ActiveLowest    = "active_lowest"
-	RuminaLowest    = "rumina_lowest"
-	WeeklyActive    = "weekly_active"
-	XRuminaDisc     = "x_rumina_disc"
-	XChangeDiscount = "x_change_discount"
-	ActiveLow       = "active_low"
-	ActiveMiddle    = "active_middle"
-	ActiveHigh      = "active_high"
-	MaxEstrus       = "max_estrus"
-	MaxHabit        = "max_habit"
+	ActiveLowest        = "active_lowest"
+	RuminaLowest        = "rumina_lowest"
+	WeeklyActive        = "weekly_active"
+	XRuminaDisc         = "x_rumina_disc"
+	XChangeDiscount     = "x_change_discount"
+	ActiveLow           = "active_low"
+	ActiveMiddle        = "active_middle"
+	ActiveHigh          = "active_high"
+	MaxEstrus           = "max_estrus"
+	MaxHabit            = "max_habit"
+	UpdateOriginalMaxId = "update_original_max_id"
 )
 
 type SystemConfigure struct {

+ 0 - 1
module/crontab/interface.go

@@ -31,5 +31,4 @@ type Crontab interface {
 	SameTimePlan() error
 	UpdateSameTime() error
 	SystemBasicCrontab() error
-	NeckRingOriginalMergeData() error
 }

+ 0 - 98
module/crontab/neck_ring.go

@@ -6,14 +6,11 @@ import (
 	"math"
 	"time"
 
-	"gorm.io/gorm"
-
 	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 	"gitee.com/xuyiping_admin/pkg/xerr"
 )
 
 const (
-	DefaultLimit    = 10000
 	MinChangeFilter = -99
 	MinRuminaFilter = -99
 	MinChewFilter   = -99
@@ -21,101 +18,6 @@ const (
 	DefaultNb       = 30
 )
 
-// NeckRingOriginalMergeData 把脖环数据合并成2个小时的
-func (e *Entry) NeckRingOriginalMergeData() error {
-	// 先看看上次任务有没有执行结束,结束在执行下面的任务
-	if ok := e.IsExistCrontabLog(NeckRingOriginal); ok {
-		return nil
-	}
-
-	e.CreateCrontabLog(NeckRingOriginal)
-	defer func() {
-		e.DeleteCrontabLog(NeckRingOriginal)
-	}()
-
-	limit := e.Cfg.NeckRingLimit
-	if limit <= 0 {
-		limit = DefaultLimit
-	}
-	neckRingList := make([]*model.NeckRingOriginal, 0)
-	if err := e.DB.Model(new(model.NeckRingOriginal)).
-		Where("is_show = ?", pasturePb.IsShow_No).
-		Limit(int(limit)).
-		Find(&neckRingList).Error; err != nil {
-		return xerr.WithStack(err)
-	}
-
-	if len(neckRingList) <= 0 {
-		return nil
-	}
-
-	neckRingIds := make([]int64, 0)
-	originalMapData := make(map[string]*model.NeckRingOriginalMerge)
-	// 合并成2个小时的
-	for _, v := range neckRingList {
-		neckRingIds = append(neckRingIds, v.Id)
-		xframeId := int(math.Floor(float64(v.FrameId)/10) * 2)
-		mapKey := fmt.Sprintf("%s%s%s%s%d", v.Imei, model.JoinKey, v.ActiveDate, model.JoinKey, xframeId) // 0001/2023-12-04/0 0001/2023-12-03/4
-		if _, ok := originalMapData[mapKey]; !ok {
-			originalMapData[mapKey] = new(model.NeckRingOriginalMerge)
-		}
-		v.IsAvgHours()
-		originalMapData[mapKey].IsMageData(v)
-	}
-
-	// 算平均值
-	for _, v := range originalMapData {
-		v.SumAvg()
-	}
-
-	weeklyActive := &model.SystemConfigure{}
-	if err := e.DB.Model(new(model.SystemConfigure)).
-		Where("name = ?", model.WeeklyActive).
-		Where("is_show = ?", pasturePb.IsShow_Ok).
-		First(weeklyActive).Error; err != nil {
-		return xerr.WithStack(err)
-	}
-
-	// 更新脖环牛只相关信息
-	newNeckActiveHabitList := model.NeckRingOriginalMap(originalMapData).ForMatData(weeklyActive.Value, e.GetCowInfoByImei)
-
-	if err := e.DB.Transaction(func(tx *gorm.DB) error {
-		// 更新已处理过的id
-		if len(neckRingIds) > 0 {
-			if err := tx.Model(new(model.NeckRingOriginal)).
-				Where("id IN ?", neckRingIds).
-				Update("is_show", pasturePb.IsShow_Ok).
-				Error; err != nil {
-				return xerr.WithStack(err)
-			}
-		}
-
-		for _, neckActiveHabit := range newNeckActiveHabitList {
-			// 新数据直接插入
-			if e.IsExistNeckActiveHabit(neckActiveHabit.NeckRingNumber, neckActiveHabit.Frameid) <= 0 {
-				if err := tx.Create(neckActiveHabit).Error; err != nil {
-					return xerr.WithStack(err)
-				}
-				continue
-			}
-
-			// 更新数据
-			historyNeckActiveHabit := e.GetNeckActiveHabit(neckActiveHabit.NeckRingNumber, neckActiveHabit.Frameid)
-			historyNeckActiveHabit.MergeData(neckActiveHabit)
-			if err := tx.Model(new(model.NeckActiveHabit)).
-				Select("rumina", "rumina", "intake", "gasp", "other", "high", "active").
-				Where("id = ?", historyNeckActiveHabit.Id).
-				Updates(historyNeckActiveHabit).Error; err != nil {
-				return xerr.WithStack(err)
-			}
-		}
-		return nil
-	}); err != nil {
-		return xerr.WithStack(err)
-	}
-	return nil
-}
-
 func (e *Entry) ActiveHabit() error {
 	lastMaxHabitId := e.GetSystemConfigure(model.MaxHabit).Value
 	currentMaxHabit := &model.NeckActiveHabit{}

+ 0 - 27
module/crontab/sql.go

@@ -33,33 +33,6 @@ func (e *Entry) GetPenMapList() (map[int32]*model.Pen, error) {
 	return penMap, nil
 }
 
-func (e *Entry) GetCowInfoByImei(imei string) *model.Cow {
-	res := &model.Cow{}
-	if err := e.DB.Model(new(model.Cow)).Where("neck_ring_number = ?", imei).First(res).Error; err != nil {
-		return nil
-	}
-	return res
-}
-
-func (e *Entry) IsExistNeckActiveHabit(neckRingNumber string, frameId int32) int64 {
-	count := int64(0)
-	if err := e.DB.Model(new(model.NeckActiveHabit)).
-		Where("neck_ring_number = ? and frameid = ?", neckRingNumber, frameId).Count(&count).Error; err != nil {
-		return 0
-	}
-	return count
-}
-
-func (e *Entry) GetNeckActiveHabit(neckRingNumber string, frameId int32) *model.NeckActiveHabit {
-	res := &model.NeckActiveHabit{}
-	if err := e.DB.Model(new(model.NeckActiveHabit)).
-		Where("neck_ring_number = ? and frameid = ?", neckRingNumber, frameId).
-		First(res).Error; err != nil {
-		return nil
-	}
-	return res
-}
-
 func (e *Entry) GetSystemConfigure(name string) *model.SystemConfigure {
 	res := &model.SystemConfigure{}
 	if err := e.DB.Model(new(model.SystemConfigure)).

+ 203 - 121
module/mqtt/handle.go

@@ -2,11 +2,16 @@ package mqtt
 
 import (
 	"encoding/json"
+	"fmt"
 	"kpt-pasture/model"
 	"kpt-pasture/util"
+	"math"
 	"strconv"
 	"strings"
 	"sync"
+	"time"
+
+	"github.com/jinzhu/copier"
 
 	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 
@@ -18,69 +23,70 @@ import (
 
 type DataInsertNeckRingLog struct {
 	NeckRingOriginalData   []*model.NeckRingOriginal
-	NeckRingErrorData      []*model.NeckRingOriginal
-	NeckRingUnRegisterData []*model.NeckRingOriginal
+	NeckRingErrorData      []*model.NeckRingError
+	NeckRingUnRegisterData []*model.NeckRingUnRegister
 	Mx                     *sync.RWMutex
 }
 
-var FrameId = map[int32]int32{
-	1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 8: 8,
-	11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 18: 18,
-	21: 21, 22: 22, 23: 23, 24: 24, 25: 25, 26: 26, 28: 28,
-	31: 31, 32: 32, 33: 33, 34: 34, 35: 35, 36: 36, 38: 38,
-	41: 41, 42: 42, 43: 43, 44: 44, 45: 45, 46: 46, 48: 48,
-	51: 51, 52: 52, 53: 53, 54: 54, 55: 55, 56: 56, 58: 58,
-	61: 61, 62: 62, 63: 63, 64: 64, 65: 65, 66: 66, 68: 68,
-	71: 71, 72: 72, 73: 73, 74: 74, 75: 75, 76: 76, 78: 78,
-	81: 81, 82: 82, 83: 83, 84: 84, 85: 85, 86: 86, 88: 88,
-	91: 91, 92: 92, 93: 93, 94: 94, 95: 95, 96: 96, 98: 98,
-	101: 101, 102: 102, 103: 103, 104: 104, 105: 105, 106: 106, 108: 108,
-	111: 111, 112: 112, 113: 113, 114: 114, 115: 115, 116: 116, 118: 118,
-}
-
 var (
-	batchSize = 10
-	batchList = make([]*model.NeckRingOriginal, 0, batchSize)
+	batchSize    = 10
+	batchList    = make([]*model.NeckRingOriginal, 0, batchSize)
+	DefaultLimit = int32(10000)
+	DSMLog       = &DataInsertNeckRingLog{
+		NeckRingOriginalData:   make([]*model.NeckRingOriginal, 0),
+		NeckRingErrorData:      make([]*model.NeckRingError, 0),
+		NeckRingUnRegisterData: make([]*model.NeckRingUnRegister, 0),
+		Mx:                     &sync.RWMutex{},
+	}
 )
 
 func (e *Entry) NeckRingHandle(data []byte) {
-	DSMLog := &DataInsertNeckRingLog{
-		NeckRingOriginalData:   make([]*model.NeckRingOriginal, 0),
-		NeckRingErrorData:      make([]*model.NeckRingOriginal, 0),
-		NeckRingUnRegisterData: make([]*model.NeckRingOriginal, 0),
-		Mx:                     &sync.RWMutex{},
+	newData := e.MsgDataFormat2(data)
+	if newData == nil || len(newData) <= 0 {
+		return
+	}
+	zaplog.Info("NeckRingHandle", zap.Any("data", newData), zap.Any("original", string(data)), zap.Any("time", time.Now().Unix()))
+	batchList = append(batchList, newData...)
+	if len(batchList) >= batchSize {
+		e.processBatch(batchList)
+		batchList = batchList[:0] // 清空 batchList
 	}
+}
 
-	newData := e.MsgDataFormat(data)
-	if newData != nil {
-		batchList = append(batchList, newData)
-		if len(batchList) >= batchSize {
-			DSMLog.Mx.Lock()
-			for _, batch := range batchList {
-				// 异常脖环数据
-				if _, ok := FrameId[batch.FrameId]; !ok {
-					DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, batch)
-					continue
-				}
-				// 未佩戴的脖环数据
-				if ok := e.NeckRingIsBind(batch.Imei); !ok {
-					DSMLog.NeckRingUnRegisterData = append(DSMLog.NeckRingUnRegisterData, batch)
-					continue
-				}
-				// 正常脖环数据
-				DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, batch)
-			}
+// 处理批量数据
+func (e *Entry) processBatch(batchList []*model.NeckRingOriginal) {
+	// 初始化分类数据
+	var (
+		errorData    []*model.NeckRingError
+		originalData []*model.NeckRingOriginal
+	)
 
-			if err := e.CreatedData(DSMLog); err != nil {
-				zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
-			}
-			DSMLog.Mx.Unlock()
-			DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
-			DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0)
-			DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0)
-			batchList = batchList[:0]
+	// 分类数据
+	for _, batch := range batchList {
+		// 异常脖环数据
+		if ok := util.IsValidFrameId(batch.Frameid); !ok {
+			var ed model.NeckRingError
+			copier.Copy(&ed, &batch)
+			errorData = append(errorData, &ed)
+		} else {
+			originalData = append(originalData, batch)
 		}
 	}
+
+	// 更新日志
+	DSMLog.Mx.Lock()
+	defer DSMLog.Mx.Unlock()
+	DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, errorData...)
+	DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, originalData...)
+
+	// 写入数据
+	if err := e.CreatedData(DSMLog); err != nil {
+		zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", DSMLog))
+	}
+
+	// 清空日志
+	DSMLog.NeckRingErrorData = DSMLog.NeckRingErrorData[:0]
+	DSMLog.NeckRingOriginalData = DSMLog.NeckRingOriginalData[:0]
 }
 
 func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
@@ -96,12 +102,6 @@ func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
 				return xerr.WithStack(err)
 			}
 		}
-
-		if len(DSMLog.NeckRingUnRegisterData) > 0 {
-			if err := e.DB.Create(DSMLog.NeckRingUnRegisterData).Error; err != nil {
-				return xerr.WithStack(err)
-			}
-		}
 		return nil
 	}); err != nil {
 		return xerr.WithStack(err)
@@ -109,7 +109,7 @@ func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
 	return nil
 }
 
-func (e *Entry) MsgDataFormat(msg []byte) *model.NeckRingOriginal {
+func (e *Entry) MsgDataFormat(msg []byte) []*model.NeckRingOriginal {
 	msgData := make(map[string]interface{})
 	pairs := strings.Split(util.MsgFormat(string(msg)), " ")
 	for _, pair := range pairs {
@@ -153,20 +153,6 @@ func (e *Entry) MsgDataFormat(msg []byte) *model.NeckRingOriginal {
 		}
 	}
 
-	cowId := ""
-	if cowIdInter, ok := msgData["cowid"]; ok {
-		if cowIdStr, ok := cowIdInter.(string); ok {
-			cowId = cowIdStr
-		}
-	}
-
-	csq := int64(0)
-	if csqInter, ok := msgData["csq"]; ok {
-		if csq32, ok := csqInter.(string); ok {
-			csq, _ = strconv.ParseInt(csq32, 10, 64)
-		}
-	}
-
 	temp := float64(0)
 	if tempInter, ok := msgData["Temp"]; ok {
 		if tempFloat, ok := tempInter.(string); ok {
@@ -238,13 +224,6 @@ func (e *Entry) MsgDataFormat(msg []byte) *model.NeckRingOriginal {
 		}
 	}
 
-	other := int64(0)
-	if otherInter, ok := msgData["other"]; ok {
-		if other32, ok := otherInter.(string); ok {
-			other, _ = strconv.ParseInt(other32, 10, 64)
-		}
-	}
-
 	reMain := int64(0)
 	if reMainInter, ok := msgData["Remain"]; ok {
 		if reMain32, ok := reMainInter.(string); ok {
@@ -258,6 +237,27 @@ func (e *Entry) MsgDataFormat(msg []byte) *model.NeckRingOriginal {
 			}
 		}
 	}
+	/*cowId := ""
+	if cowIdInter, ok := msgData["cowid"]; ok {
+		if cowIdStr, ok := cowIdInter.(string); ok {
+			cowId = cowIdStr
+		}
+	}
+
+	csq := int64(0)
+	if csqInter, ok := msgData["csq"]; ok {
+		if csq32, ok := csqInter.(string); ok {
+			csq, _ = strconv.ParseInt(csq32, 10, 64)
+		}
+	}
+
+	other := int64(0)
+	if otherInter, ok := msgData["other"]; ok {
+		if other32, ok := otherInter.(string); ok {
+			other, _ = strconv.ParseInt(other32, 10, 64)
+		}
+	}
+
 	nccId := ""
 	if nccIdInter, ok := msgData["nccid"]; ok {
 		if nccIdStr, ok := nccIdInter.(string); ok {
@@ -265,56 +265,138 @@ func (e *Entry) MsgDataFormat(msg []byte) *model.NeckRingOriginal {
 		}
 	}
 
-	return &model.NeckRingOriginal{
-		Version:  int32(softVer),
-		Uuid:     uuid,
-		CowId:    cowId,
-		FrameId:  int32(frameId),
-		Csq:      int32(csq),
-		Temp:     int32(temp * 100),
-		Imei:     imei,
-		Active:   int32(active),
-		Inactive: int32(inAction),
-		Rumina:   int32(ruMina),
-		Intake:   int32(intake),
-		Gasp:     int32(gasp),
-		Other:    int32(other),
-		Remain:   int32(reMain),
-		Nccid:    nccId,
+	*/
+
+	return []*model.NeckRingOriginal{
+		{
+			FirmwareVersion: int32(softVer),
+			Uuid:            uuid,
+			Frameid:         int32(frameId),
+			ReceiveNumber:   imei,
+			Active:          int32(active),
+			Inactive:        int32(inAction),
+			Rumina:          int32(ruMina),
+			Intake:          int32(intake),
+			Gasp:            int32(gasp),
+			Remain:          int32(reMain),
+		},
 	}
 }
 
-func (e *Entry) MsgDataFormat2(msg []byte) *model.NeckRingOriginal {
-	neckLog := &Behavior{}
-	if err := json.Unmarshal(msg, neckLog); err != nil {
+func (e *Entry) MsgDataFormat2(msg []byte) []*model.NeckRingOriginal {
+	neckLogList := &NeckRingWrapper{}
+	if err := json.Unmarshal(msg, neckLogList); err != nil {
 		zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
 	}
-	if neckLog.Imei != "" {
-		// 存储到数据库
-		activeDate, hours := util.GetNeckRingActiveTimer(neckLog.FrameId)
-		voltage, _ := strconv.ParseInt(strconv.FormatInt(int64(neckLog.BAT), 16), 10, 64)
+
+	res := make([]*model.NeckRingOriginal, 0)
+	for _, neckLog := range neckLogList.NeckRing.NeckPck {
+		activeDate, hours := util.GetNeckRingActiveTimer(neckLog.Frameid)
 		activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
-		if neckLog.FrameId%10 == 8 {
+		if neckLog.Frameid%10 == 8 {
 			activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
 		}
-		return &model.NeckRingOriginal{
-			Uuid:           neckLog.UUID,
-			Imei:           neckLog.ECowId,
-			ActiveDate:     activeDate,
-			Hours:          int32(hours),
-			FrameId:        neckLog.FrameId,
-			Rumina:         neckLog.RuMina,
-			Intake:         neckLog.Intake,
-			Inactive:       neckLog.Inactive,
-			Other:          neckLog.Other,
-			High:           neckLog.Activitys,
-			Active:         neckLog.High,
-			Voltage:        int32(voltage),
-			Version:        neckLog.Sver,
-			Remain:         neckLog.Remain,
-			ReceiveNumber:  neckLog.Imei,
-			ActiveDateType: activeDateTimeType,
+		res = append(res, &model.NeckRingOriginal{
+			Uuid:            neckLog.UUID,
+			NeckRingNumber:  fmt.Sprintf("%d", neckLog.Ecowid),
+			ActiveDate:      activeDate,
+			Hours:           int32(hours),
+			Frameid:         neckLog.Frameid,
+			Rumina:          neckLog.Rumina,
+			Intake:          neckLog.Intake,
+			Inactive:        neckLog.Inactive,
+			Gasp:            neckLog.Other,
+			High:            neckLog.Activitys,
+			Active:          neckLog.High,
+			FirmwareVersion: neckLog.Sver,
+			HardwareVersion: neckLog.Hver,
+			Remain:          neckLog.Remain,
+			Voltage:         neckLog.BAT,
+			RestartReason:   neckLog.STATUS,
+			Upper:           neckLog.UpPer,
+			Imei:            neckLog.Imei,
+			ReceiveNumber:   neckLog.Imei,
+			ActiveDateType:  activeDateTimeType,
+			IsShow:          pasturePb.IsShow_No,
+		})
+	}
+	return res
+}
+
+// NeckRingOriginalMergeData 把脖环数据合并成2个小时的
+func (e *Entry) NeckRingOriginalMergeData() {
+	limit := e.Cfg.NeckRingLimit
+	if limit <= 0 {
+		limit = DefaultLimit
+	}
+
+	updateOriginalMaxId := e.GetSystemConfigure(model.MaxEstrus).Value
+	neckRingList := make([]*model.NeckRingOriginal, 0)
+	if err := e.DB.Model(new(model.NeckRingOriginal)).
+		Where("id > ?", updateOriginalMaxId).
+		Order("id asc").Limit(int(limit)).
+		Find(&neckRingList).Error; err != nil {
+		return
+	}
+
+	if len(neckRingList) <= 0 {
+		return
+	}
+
+	originalMapData := make(map[string]*model.NeckRingOriginalMerge)
+	// 合并成2个小时的
+	for _, v := range neckRingList {
+		xframeId := int(math.Floor(float64(v.Frameid)/10) * 2)
+		mapKey := fmt.Sprintf("%s%s%s%s%d", v.NeckRingNumber, model.JoinKey, v.ActiveDate, model.JoinKey, xframeId) // 0001/2023-12-04/0 0001/2023-12-03/4
+		if _, ok := originalMapData[mapKey]; !ok {
+			originalMapData[mapKey] = new(model.NeckRingOriginalMerge)
 		}
+		v.IsAvgHours()
+		originalMapData[mapKey].IsMageData(v)
+	}
+
+	// 算平均值
+	for _, v := range originalMapData {
+		v.SumAvg()
+	}
+
+	// 更新脖环牛只相关信息
+	newNeckActiveHabitList := model.NeckRingOriginalMap(originalMapData).ForMatData()
+	if err := e.DB.Transaction(func(tx *gorm.DB) error {
+		// 更新已处理过的id
+		if err := tx.Model(new(model.SystemConfigure)).
+			Where("name = ?", model.UpdateOriginalMaxId).
+			Update("value", neckRingList[len(neckRingList)-1].Id).
+			Error; err != nil {
+			return xerr.WithStack(err)
+		}
+
+		for _, neckActiveHabit := range newNeckActiveHabitList {
+			// 新数据直接插入 todo 待优化
+			historyNeckActiveHabit, ct := e.IsExistNeckActiveHabit(neckActiveHabit.NeckRingNumber, neckActiveHabit.HeatDate, neckActiveHabit.Frameid)
+			if ct <= 0 {
+				if err := tx.Create(neckActiveHabit).Error; err != nil {
+					return xerr.WithStack(err)
+				}
+				continue
+			}
+
+			if historyNeckActiveHabit == nil {
+				zaplog.Error("NeckRingOriginalMergeData", zap.Any("historyNeckActiveHabit", historyNeckActiveHabit))
+				continue
+			}
+			// 更新数据
+			historyNeckActiveHabit.MergeData(neckActiveHabit)
+			if err := tx.Model(new(model.NeckActiveHabit)).
+				Select("rumina", "intake", "inactive", "gasp", "other", "high", "active").
+				Where("id = ?", historyNeckActiveHabit.Id).
+				Updates(historyNeckActiveHabit).Error; err != nil {
+				return xerr.WithStack(err)
+			}
+		}
+		return nil
+	}); err != nil {
+		zaplog.Error("NeckRingOriginalMergeData", zap.Any("transaction", err))
+		return
 	}
-	return nil
 }

+ 1 - 0
module/mqtt/interface.go

@@ -22,4 +22,5 @@ type Entry struct {
 
 type DataHandle interface {
 	NeckRingHandle(msg []byte)
+	NeckRingOriginalMergeData()
 }

+ 6 - 6
module/mqtt/model.go

@@ -20,26 +20,26 @@ type NewBehavior struct {
 
 type Behavior struct {
 	UUID      string `json:"uuid"`
-	ECowId    string `json:"ecowid"`
-	FrameId   int32  `json:"frameid"`
+	Ecowid    int32  `json:"ecowid"`
+	Frameid   int32  `json:"frameid"`
 	High      int32  `json:"High"`
 	Intake    int32  `json:"Intake"`
-	RuMina    int32  `json:"Rumina"`
+	Rumina    int32  `json:"Rumina"`
 	Other     int32  `json:"Other"`
 	Activitys int32  `json:"activitys"`
 	Inactive  int32  `json:"inactive"`
 	Sver      int32  `json:"Sver"`
+	Hver      int32  `json:"Hver"`
 	Remain    int32  `json:"Remain"`
-	RFRssi    int32  `json:"RFRssi"`
 	STATUS    int32  `json:"STATUS"`
+	UpPer     int32  `json:"UpPer"`
 	BAT       int32  `json:"BAT"`
 	Imei      string `json:"imei"`
-	Gasp      int32  `json:"gasp"`
 }
 
 type NeckRingWrapper struct {
 	Type     string `json:"type"`
 	NeckRing struct {
-		NeckPck []*Behavior `json:"neck"` // neck_pck neck
+		NeckPck []Behavior `json:"neck"` // neck_pck neck
 	} `json:"NeckRing"`
 }

+ 34 - 0
module/mqtt/sql.go

@@ -19,3 +19,37 @@ func (e *Entry) NeckRingIsBind(number string) bool {
 	}
 	return false
 }
+
+func (e *Entry) GetCowInfoByImei(imei string) *model.Cow {
+	res := &model.Cow{}
+	if err := e.DB.Model(new(model.Cow)).Where("neck_ring_number = ?", imei).First(res).Error; err != nil {
+		return nil
+	}
+	return res
+}
+
+func (e *Entry) GetSystemConfigure(name string) *model.SystemConfigure {
+	res := &model.SystemConfigure{}
+	if err := e.DB.Model(new(model.SystemConfigure)).
+		Where("name = ?", name).
+		Where("is_show = ?", pasturePb.IsShow_Ok).
+		First(res).Error; err != nil {
+		return nil
+	}
+	return res
+}
+
+func (e *Entry) IsExistNeckActiveHabit(neckRingNumber, heatDate string, frameId int32) (*model.NeckActiveHabit, int64) {
+	count := int64(0)
+	res := &model.NeckActiveHabit{}
+	if err := e.DB.Model(new(model.NeckActiveHabit)).
+		Where("neck_ring_number = ?", neckRingNumber).
+		Where("heat_date = ?", heatDate).
+		Where("frameid = ?", frameId).
+		Count(&count).
+		First(res).
+		Error; err != nil {
+		return nil, 0
+	}
+	return res, count
+}

+ 10 - 10
service/mqtt/consumer.go

@@ -10,26 +10,26 @@ import (
 var (
 	bufferPool = sync.Pool{
 		New: func() interface{} {
-			return make([]byte, 1024) // 根据实际情况调整缓冲区大小
+			return make([]byte, 1024*50) // 根据实际情况调整缓冲区大小
 		},
 	}
-	readMsgChan  = make(chan []byte, 2)
-	writeMsgChan = make(chan []byte, 2)
+	readMsgChan  = make(chan []byte, 5)
+	writeMsgChan = make(chan []byte, 5)
 )
 
 // Consumer 处理收到的消息
 func (s *IMqttClient) Consumer(handle func([]byte)) {
-	ac := time.NewTicker(2 * time.Minute)
+	tc := time.NewTicker(2 * time.Minute)
+	defer tc.Stop()
+
 	var allCnt int32 = 0
 	go func() {
-		for {
-			select {
-			case <-ac.C:
-				fmt.Println("allCnt:", allCnt)
-				atomic.StoreInt32(&allCnt, 0)
-			}
+		for range tc.C {
+			fmt.Println("allCnt:", allCnt)
+			atomic.StoreInt32(&allCnt, 0)
 		}
 	}()
+
 	for {
 		select {
 		case msg := <-writeMsgChan:

+ 52 - 11
service/mqtt/interface.go

@@ -1,6 +1,7 @@
 package mqtt
 
 import (
+	"context"
 	"fmt"
 	"kpt-pasture/config"
 	"kpt-pasture/util"
@@ -31,6 +32,12 @@ type IMqttServer interface {
 	Close()
 }
 
+var (
+	maxRetryAttempts = 500             // 最大重试次数
+	initialRetryWait = 5 * time.Minute // 初始重试间隔
+	maxRetryWait     = 1 * time.Hour   // 最大重试间隔
+)
+
 var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
 	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
 }
@@ -41,17 +48,35 @@ var connectHandler = func(client golangMqtt.Client) {
 
 var connectLostHandler = func(client golangMqtt.Client, err error) {
 	zaplog.Error("connectLost-err", zap.Any("err", err.Error()))
-	for {
-		token := client.Connect()
-		if token.Wait() && token.Error() == nil {
-			// 成功重连,更新全局客户端实例
-			connectHandler(client)
-			break
-		}
-		zaplog.Error("ConnectionRetry", zap.Any("err", token.Error()))
-		time.Sleep(5 * time.Second)
+	retryConnect(client, 0)
+}
+
+// 重试连接函数
+func retryConnect(client golangMqtt.Client, attempt int) {
+	if attempt >= maxRetryAttempts {
+		zaplog.Error("Max retry attempts reached, giving up")
+		return
+	}
+
+	// 计算重试间隔(指数退避)
+	retryWait := initialRetryWait * time.Duration(1<<attempt) // 2^attempt 指数增长
+	if retryWait > maxRetryWait {
+		retryWait = maxRetryWait
 	}
-	zaplog.Error("connectLost-success")
+
+	zaplog.Info("Retrying connection", zap.Any("attempt", attempt+1), zap.Any("waitTime", retryWait))
+	time.Sleep(retryWait)
+
+	// 尝试重新连接
+	token := client.Connect()
+	if token.Wait() && token.Error() == nil {
+		zaplog.Info("Reconnected successfully")
+		connectHandler(client)
+		return
+	}
+
+	zaplog.Error("Connection retry failed", zap.Any("err", token.Error()))
+	retryConnect(client, attempt+1) // 递归调用,继续重试
 }
 
 func NewServer(config *config.AppConfig) IMqttServer {
@@ -69,9 +94,10 @@ func NewServer(config *config.AppConfig) IMqttServer {
 	opts.SetDefaultPublishHandler(messagePubHandler)
 	opts.OnConnect = connectHandler
 	opts.OnConnectionLost = connectLostHandler
+
 	client := golangMqtt.NewClient(opts)
 	if token := client.Connect(); token.Wait() && token.Error() != nil {
-		panic(token.Error())
+		retryConnect(client, 0)
 	}
 	return &IMqttClient{Client: client, Config: conf}
 }
@@ -86,13 +112,26 @@ func (s *IMqttClient) Run(enter handleMqtt.Entry) {
 	// 设置信号监听以优雅关闭服务器
 	stop := make(chan os.Signal, 1)
 	signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+	// 每隔30分钟
+	ticker := time.NewTicker(10 * time.Minute)
+	defer ticker.Stop()
+
+	// 创建上下文,用于优雅关闭
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
 	go func() {
 		for {
 			select {
 			// 等待停止信号
 			case <-stop:
+				cancel()
 				s.Close()
 				return
+			case <-ticker.C:
+				enter.NeckRingOriginalMergeData()
+			case <-ctx.Done():
+				return
 			}
 		}
 	}()
@@ -104,6 +143,8 @@ func (s *IMqttClient) Run(enter handleMqtt.Entry) {
 		select {
 		case readMsgChan <- buffer[:len(msg.Payload())]:
 			bufferPool.Put(buffer)
+		case <-ctx.Done():
+			return
 		}
 	}); token.Wait() && token.Error() != nil {
 		zaplog.Error("Consumer", zap.Any("topic", s.Config.SubTopic), zap.Any("err", token.Error()))

+ 41 - 8
util/util.go

@@ -21,6 +21,24 @@ const (
 	LetterIdxMax  = 63 / LetterIdxBits   // # of letter indices fitting in 63 bits
 )
 
+var (
+	FrameIdMap = map[int32]int32{
+		1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 8: 8, // 00-02
+		11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 18: 18, // 02-04
+		21: 21, 22: 22, 23: 23, 24: 24, 25: 25, 26: 26, 28: 28, // 04-06
+		31: 31, 32: 32, 33: 33, 34: 34, 35: 35, 36: 36, 38: 38, // 06-08
+		41: 41, 42: 42, 43: 43, 44: 44, 45: 45, 46: 46, 48: 48, // 08-10
+		51: 51, 52: 52, 53: 53, 54: 54, 55: 55, 56: 56, 58: 58, // 10-12
+		61: 61, 62: 62, 63: 63, 64: 64, 65: 65, 66: 66, 68: 68, // 12-14
+		71: 71, 72: 72, 73: 73, 74: 74, 75: 75, 76: 76, 78: 78, // 14-16
+		81: 81, 82: 82, 83: 83, 84: 84, 85: 85, 86: 86, 88: 88, // 16-18
+		91: 91, 92: 92, 93: 93, 94: 94, 95: 95, 96: 96, 98: 98, // 18-20
+		101: 101, 102: 102, 103: 103, 104: 104, 105: 105, 106: 106, 108: 108, // 20-22
+		111: 111, 112: 112, 113: 113, 114: 114, 115: 115, 116: 116, 118: 118, // 22-24
+	}
+	SpecialHours = map[int]int{8: 2, 18: 4, 28: 6, 38: 8, 48: 10, 58: 12, 68: 14, 78: 16, 88: 18, 98: 20, 108: 22, 118: 0}
+)
+
 // GenerateRandomNumberString 生成指定长度的数字串
 func GenerateRandomNumberString(n int) string {
 	result := make([]byte, n)
@@ -327,25 +345,34 @@ func MsgFormat(input string) string {
 	return re.ReplaceAllString(input, ":")
 }
 
+// IsValidFrameId 检查 FrameId 是否有效
+func IsValidFrameId(frameId int32) bool {
+	_, ok := FrameIdMap[frameId]
+	return ok
+}
+
 /*
 GetNeckRingActiveTimer
 1. frameId值如果是:1到6代表每天的0点到2点,11-16 代表每天的2点到4点, 21-26 代表每天的4点到6点,31-36代表每天的6点到8点,
 41-46 代表每天的8点到10点,51-56代表每天的10点到12点,61-66代表每天的12-14点,71-76代表每天的14-16点,81-86代表每天的16-18点,
-91-96代表每天的18-20点,101-106代表每天的20-22点,111-116代表每天的22-24点。其中每天数字代表20分钟。如果frameId大于接受时间点,就代表frameId是昨天的。
-2. 如果farmId取值出现8,18,28,3848,58,68,78,88,98,108,118数字分别代表2个小时,从0-2点开始,以此类推。
-帮我根据frameId值,获取对应的时间点
+91-96代表每天的18-20点,101-106代表每天的20-22点,111-116代表每天的22-24点。其中每个数字代表20分钟。
+2. 如果frameId大于当前时间点,并往前推20个小时,如果在这个20小时范围内, 就代表frameId是昨天的。
+3. 如果farmId的值出现8,18,28,3848,58,68,78,88,98,108,118数字分别代表2个小时,从0-2点开始,以此类推。
+帮我根据frameId值,获取对应的时间(YYYY-MM-DD)和小时
 */
 func GetNeckRingActiveTimer(frameId int32) (dateTime string, hours int) {
 	if frameId < 0 || frameId > 118 {
 		return "", 0
 	}
+
+	if _, ok := FrameIdMap[frameId]; !ok {
+		return "", 0
+	}
+
 	nowTime := time.Now()
 	currHour := nowTime.Hour()
 	// 处理2小时的特殊 farmId
-	specialHours := map[int]int{
-		8: 2, 18: 4, 28: 6, 38: 8, 48: 10, 58: 12, 68: 14, 78: 16, 88: 18, 98: 20, 108: 22, 118: 0,
-	}
-	hours, ok := specialHours[int(frameId)]
+	hours, ok := SpecialHours[int(frameId)]
 	day := 0
 	if ok {
 		if hours > currHour {
@@ -359,7 +386,13 @@ func GetNeckRingActiveTimer(frameId int32) (dateTime string, hours int) {
 	units := int(frameId % 10)
 	hours += units / 3
 	if hours > currHour {
-		day = -1
+		for i := 1; i <= 20; i++ {
+			twentyHoursAgo := nowTime.Add(-time.Duration(i) * time.Hour).Hour()
+			if hours == twentyHoursAgo {
+				day = -1
+				break
+			}
+		}
 	}
 	dateTime = nowTime.AddDate(0, 0, day).Format(Layout)
 	return

+ 16 - 10
util/util_test.go

@@ -501,22 +501,28 @@ func TestGetNeckRingActiveTimer(t *testing.T) {
 
 	for i, frameId := range tests.frameId {
 		got, hours := GetNeckRingActiveTimer(frameId)
-		t.Logf("frameId: %d, hours: %d, got :%s", frameId, tests.hours[i], got)
+		t.Logf("frameId: %d, test-hours: %d,hours:%d  got :%s", frameId, tests.hours[i], hours, got)
 		//assert.Equal(t, got, tests.dateTime[i])
 		assert.Equal(t, int32(hours), tests.hours[i])
 	}
 }
 
 func Test_demo(t *testing.T) {
-	ids := []int32{1, 2, 3, 4, 5, 6, 11, 12, 13, 14, 15, 16, 21, 22, 23, 24, 25, 26,
-		31, 32, 33, 34, 35, 36, 41, 42, 43, 44, 45, 46, 51, 52, 53, 54, 55, 56,
-		61, 62, 63, 64, 65, 66, 71, 72, 73, 74, 75, 76, 81, 82, 83, 84, 85, 86,
-		91, 92, 93, 94, 95, 96, 101, 102, 103, 104, 105, 106, 111, 112, 113, 114, 115, 116,
-		8, 18, 28, 38, 48, 58, 68, 78, 88, 98, 108, 118,
+	frameId := 76
+	day := 0
+	nowTime := time.Now()
+	hours := int(math.Floor(float64(frameId)/10) * 2)
+	fmt.Println(hours)
+	units := frameId % 10
+	hours += units / 3
+	fmt.Println(hours)
+	currHours := 13
+	if hours > currHours {
+		if nowTime.Add(-20*time.Hour).Hour() == hours {
+			day = -1
+		}
 	}
+	dateTime := nowTime.AddDate(0, 0, day).Format(Layout)
+	fmt.Println(hours, dateTime, units)
 
-	for _, v := range ids {
-		hours := int(math.Floor(float64(v)/10) * 2)
-		fmt.Println(hours)
-	}
 }