Browse Source

neck_ring: 脖环数据处理

Yi 3 months ago
parent
commit
4f460d0d20

+ 1 - 2
dep/dep.go

@@ -8,7 +8,6 @@ import (
 	moduleMqtt "kpt-pasture/module/mqtt"
 	"kpt-pasture/service/asynqsvc"
 	"kpt-pasture/service/mqtt"
-	"kpt-pasture/service/mqtt2"
 	"kpt-pasture/service/redis"
 	"kpt-pasture/service/sso"
 	"kpt-pasture/service/wechat"
@@ -40,7 +39,7 @@ func Options() []di.HubOption {
 		redis.Module,
 		crontab.Module,
 		mqtt.Module,
-		mqtt2.Module,
+		//mqtt2.Module,
 		moduleMqtt.Module,
 	}
 }

+ 1 - 1
dep/di_crontab.go

@@ -77,7 +77,7 @@ func EntryCrontab(dependency CrontabDependency) *cron.Crontab {
 		panic(err)
 	}
 
-	err = newCrontab.Bind("NeckRing", cs.NeckRing, dependency.CrontabHub.NeckRingData)
+	err = newCrontab.Bind("NeckRing", cs.NeckRing, dependency.CrontabHub.NeckRingMergeData)
 	if err != nil {
 		panic(err)
 	}

+ 1 - 1
go.mod

@@ -3,7 +3,7 @@ module kpt-pasture
 go 1.17
 
 require (
-	gitee.com/xuyiping_admin/go_proto v0.0.0-20241128102506-727966a0f004
+	gitee.com/xuyiping_admin/go_proto v0.0.0-20241203073346-ba8687d6c8de
 	gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
 	github.com/eclipse/paho.mqtt.golang v1.4.3

+ 2 - 0
go.sum

@@ -38,6 +38,8 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
 gitee.com/xuyiping_admin/go_proto v0.0.0-20241128102506-727966a0f004 h1:0kHmrqRNqiJuzIGIk+dkAsTd/7iAlPRKPYN5h2aaeTo=
 gitee.com/xuyiping_admin/go_proto v0.0.0-20241128102506-727966a0f004/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
+gitee.com/xuyiping_admin/go_proto v0.0.0-20241203073346-ba8687d6c8de h1:dIATo9IIIOfcQaSISrclzRF0piVoy7bydeG0psN1Lt0=
+gitee.com/xuyiping_admin/go_proto v0.0.0-20241203073346-ba8687d6c8de/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
 gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b h1:w05MxH7yqveRlaRbxHhbif5YjPrJFodRPfOjYhXn7Zk=
 gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b/go.mod h1:8tF25X6pE9WkFCczlNAC0K2mrjwKvhhp02I7o0HtDxY=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=

+ 58 - 0
model/cow_active_habit.go

@@ -0,0 +1,58 @@
+package model
+
+import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
+
+type CowActiveHabit struct {
+	Id                       int64                 `json:"id"`
+	CowId                    int64                 `json:"cowId"`
+	NeckRingNumber           string                `json:"neckRingNumber"`
+	Lact                     int32                 `json:"lact"`
+	FrameId                  int32                 `json:"frameId"`
+	Rumina                   int32                 `json:"rumina"`
+	Intake                   int32                 `json:"intake"`
+	Inactive                 int32                 `json:"inactive"`
+	Gasp                     int32                 `json:"gasp"`
+	Other                    int32                 `json:"other"`
+	High                     int32                 `json:"high"`
+	Active                   int32                 `json:"active"`
+	Voltage                  int32                 `json:"voltage"`
+	Version                  int32                 `json:"version"`
+	FilterHigh               int32                 `json:"filterHigh"`
+	FilterRumina             int32                 `json:"filterRumina"`
+	FilterChew               int32                 `json:"filterChew"`
+	WeekHigh                 int32                 `json:"weekHigh"`
+	WeekAvgHighHabit         int32                 `json:"weekAvgHighHabit"`
+	WeekAvgRuminaHabit       int32                 `json:"WeekAvgRuminaHabit"`
+	WeekAvgIntakeHabit       int32                 `json:"weekAvgIntakeHabit"`
+	WeekAvgChewHabit         int32                 `json:"weekAvgChewHabit"`
+	WeekAvgInactiveHabit     int32                 `json:"weekAvgInactiveHabit"`
+	WeekAvgOtherHabit        int32                 `json:"weekAvgOtherHabit"`
+	ChangeHigh               int32                 `json:"changeHigh"`
+	ChangeRumina             int32                 `json:"changeRumina"`
+	ChangeChew               int32                 `json:"changeChew"`
+	ChangeAdjust             int32                 `json:"changeAdjust"`
+	ChangeFilter             int32                 `json:"changeFilter"`
+	RuminaFilter             int32                 `json:"ruminaFilter"`
+	ChewFilter               int32                 `json:"chewFilter"`
+	FilterCorrect            int32                 `json:"filterCorrect"`
+	SumRumina                int32                 `json:"sumRumina"`
+	SumIntake                int32                 `json:"sumIntake"`
+	SumInactive              int32                 `json:"sumInactive"`
+	SumAct                   int32                 `json:"sumAct"`
+	SumRuminaBeforeThreeDays int32                 `json:"sumRuminaBeforeThreeDays"`
+	SumIntakeBeforeThreeDays int32                 `json:"sumIntakeBeforeThreeDays"`
+	MinHigh                  int32                 `json:"minHigh"`
+	MaxHigh                  int32                 `json:"maxHigh"`
+	MinChew                  int32                 `json:"minChew"`
+	Score                    int32                 `json:"score"`
+	IsMaxTime                pasturePb.IsShow_Kind `json:"isMaxTime"`
+	ReceiveNumber            int32                 `json:"receiveNumber"`
+	RecodeCount              int32                 `json:"recodeCount"`
+	ActiveTime               string                `json:"activeTime"`
+	CreatedAt                int64                 `json:"createdAt"`
+	UpdatedAt                int64                 `json:"updatedAt"`
+}
+
+func (c *CowActiveHabit) TableName() string {
+	return "cow_active_habit"
+}

+ 34 - 0
model/neck_ring_error.go

@@ -0,0 +1,34 @@
+package model
+
+import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
+
+type NeckRingError struct {
+	Id             int64                 `json:"id"`
+	Uuid           string                `json:"uuid"`
+	FrameId        int32                 `json:"frameId"`
+	Low            int32                 `json:"low"`
+	High           int32                 `json:"high"`
+	Rumina         int32                 `json:"rumina"`
+	Active         int32                 `json:"active"`
+	Intake         int32                 `json:"intake"`
+	Inactive       int32                 `json:"inactive"`
+	Other          int32                 `json:"other"`
+	Voltage        int32                 `json:"voltage"`
+	Upper          int32                 `json:"upper"`
+	Version        int32                 `json:"version"`
+	Sign           int32                 `json:"sign"`
+	Remain         int32                 `json:"remain"`
+	Feed           int32                 `json:"feed"`
+	Imei           string                `json:"imei" `
+	Temp           int32                 `json:"temp"`
+	Gasp           int32                 `json:"gasp"`
+	ActiveDateTime string                `json:"activeDateTime"`
+	IsShow         pasturePb.IsShow_Kind `json:"isShow"`
+	ReceiveNumber  string                `json:"receiveNumber"`
+	CreatedAt      int64                 `json:"createdAt"`
+	UpdatedAt      int64                 `json:"updatedAt"`
+}
+
+func (s *NeckRingError) TableName() string {
+	return "neck_ring_error"
+}

+ 37 - 0
model/neck_ring_original.go

@@ -0,0 +1,37 @@
+package model
+
+import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
+
+type NeckRingOriginal struct {
+	Id                 int64                         `json:"id"`
+	Uuid               string                        `json:"uuid"`
+	FrameId            int32                         `json:"frameId"`
+	Low                int32                         `json:"low"`
+	High               int32                         `json:"high"`
+	Rumina             int32                         `json:"rumina"`
+	Active             int32                         `json:"active"`
+	Intake             int32                         `json:"intake"`
+	Inactive           int32                         `json:"inactive"`
+	Other              int32                         `json:"other"`
+	Voltage            int32                         `json:"voltage"`
+	Upper              int32                         `json:"upper"`
+	Version            int32                         `json:"version"`
+	Sign               int32                         `json:"sign"`
+	Remain             int32                         `json:"remain"`
+	Feed               int32                         `json:"feed"`
+	Imei               string                        `json:"imei" `
+	Temp               int32                         `json:"temp"`
+	Gasp               int32                         `json:"gasp"`
+	Hours              int32                         `json:"hours"`
+	ActiveDate         string                        `json:"activeDate"`
+	ActiveDateType     pasturePb.ActiveTimeType_Kind `json:"ActiveDateTimeType"`
+	IsShow             pasturePb.IsShow_Kind         `json:"isShow"`
+	ReceiveNumber      string                        `json:"receiveNumber"`
+	ShortReceiveNumber string                        `json:"shortReceiveNumber"`
+	CreatedAt          int64                         `json:"createdAt"`
+	UpdatedAt          int64                         `json:"updatedAt"`
+}
+
+func (s *NeckRingOriginal) TableName() string {
+	return "neck_ring_original"
+}

+ 0 - 30
model/neck_ring_original_data.go

@@ -1,30 +0,0 @@
-package model
-
-import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
-
-type NeckRingOriginalData struct {
-	Id            int64                 `json:"id"`
-	SoftVer       int64                 `json:"softVer"`
-	Uuid          string                `json:"uuid"`
-	OriginFrameId int32                 `json:"originFrameId"`
-	FrameId       int32                 `json:"frameId"`
-	CowId         string                `json:"cowId"`
-	Csq           int64                 `json:"csq"`
-	Temp          int64                 `json:"temp"`
-	Imei          string                `json:"imei" `
-	Active        int32                 `json:"active"`
-	InActive      int32                 `json:"inactive"`
-	RuMina        int32                 `json:"ruMina"`
-	Intake        int32                 `json:"intake"`
-	Gasp          int32                 `json:"gasp"`
-	Other         int32                 `json:"other"`
-	ReMain        int32                 `json:"remain"`
-	ActiveTime    string                `json:"activeTime"`
-	IsShow        pasturePb.IsShow_Kind `json:"isShow"`
-	CreatedAt     int64                 `json:"createdAt"`
-	UpdatedAt     int64                 `json:"updatedAt"`
-}
-
-func (s *NeckRingOriginalData) TableName() string {
-	return "neck_ring_original_data"
-}

+ 32 - 0
model/neck_ring_unregist.go

@@ -0,0 +1,32 @@
+package model
+
+import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
+
+type NeckRingUnRegister struct {
+	Id             int64                 `json:"id"`
+	Uuid           string                `json:"uuid"`
+	FrameId        int32                 `json:"frameId"`
+	Low            int32                 `json:"low"`
+	High           int32                 `json:"high"`
+	Rumina         int32                 `json:"rumina"`
+	Active         int32                 `json:"active"`
+	Intake         int32                 `json:"intake"`
+	Inactive       int32                 `json:"inactive"`
+	Other          int32                 `json:"other"`
+	Voltage        int32                 `json:"voltage"`
+	Upper          int32                 `json:"upper"`
+	Version        int32                 `json:"version"`
+	Sign           int32                 `json:"sign"`
+	Remain         int32                 `json:"remain"`
+	Feed           int32                 `json:"feed"`
+	Imei           string                `json:"imei" `
+	Temp           int32                 `json:"temp"`
+	ActiveDateTime string                `json:"activeDateTime"`
+	IsShow         pasturePb.IsShow_Kind `json:"isShow"`
+	CreatedAt      int64                 `json:"createdAt"`
+	UpdatedAt      int64                 `json:"updatedAt"`
+}
+
+func (s *NeckRingUnRegister) TableName() string {
+	return "neck_ring_unregister"
+}

+ 1 - 0
module/backend/calendar.go

@@ -387,6 +387,7 @@ func (s *StoreEntry) MatingCowList(ctx context.Context, req *pasturePb.ItemsRequ
 	count := int64(0)
 	pref := s.DB.Table(fmt.Sprintf("%s as a", new(model.EventMating).TableName())).
 		Select("a.id,a.cow_id,a.status,b.breed_status,b.cow_type,b.pen_id,b.day_age,b.calving_age,b.abortion_age,c.name as pen_name").
+		Joins("left join cow as b on a.cow_id = b.id").
 		Joins("left join pen as c on a.pen_id = c.id").
 		Where("a.status = ?", pasturePb.IsShow_No)
 

+ 1 - 1
module/crontab/interface.go

@@ -34,5 +34,5 @@ type Crontab interface {
 	UpdateSameTime() error
 	SystemBasicCrontab() error
 	CowPregnant() error
-	NeckRingData() error
+	NeckRingMergeData() error
 }

+ 34 - 17
module/crontab/neck_ring.go

@@ -1,6 +1,7 @@
 package crontab
 
 import (
+	"fmt"
 	"kpt-pasture/config"
 	"kpt-pasture/model"
 
@@ -8,17 +9,17 @@ import (
 	"gitee.com/xuyiping_admin/pkg/xerr"
 )
 
-func (e *Entry) NeckRingData() error {
+func (e *Entry) NeckRingMergeData() error {
 	cfg := config.Options()
 	limit := cfg.NeckRingLimit
 	if limit <= 0 {
 		limit = 10000
 	}
-	neckRingList := make([]*model.NeckRingOriginalData, 0)
-	if err := e.DB.Model(new(model.NeckRingOriginalData)).
-		Where("is_show = ?", pasturePb.IsShow_No).
-		Order("id asc").Limit(int(limit)).
-		Find(&neckRingList).Error; err != nil {
+
+	neckRingList := make([]*model.NeckRingOriginal, 0)
+	if err := e.DB.Model(new(model.NeckRingOriginal)).
+		Where("h1.is_show = ?", pasturePb.IsShow_No).
+		Find(&neckRingList).Limit(int(limit)).Error; err != nil {
 		return xerr.WithStack(err)
 	}
 
@@ -30,24 +31,40 @@ func (e *Entry) NeckRingData() error {
 	// 更新已处理过的id
 	defer func() {
 		if len(neckRingIds) > 0 {
-			e.DB.Model(new(model.NeckRingOriginalData)).
+			e.DB.Model(new(model.NeckRingOriginal)).
 				Where("id IN ?", neckRingIds).
 				Update("is_show", pasturePb.IsShow_Ok)
 		}
 	}()
+
+	originalMapData := make(map[string]*model.NeckRingOriginal)
+	// 合并成2个小时的
 	for i, v := range neckRingList {
-		neckRingIds[i] = v.Id
-		if v.Imei == "" {
+		if v.ActiveDate == "" {
 			continue
 		}
-
-	}
-	return nil
-}
-
-func (e *Entry) NeckRingOriginalDataMerge(dataList []*model.NeckRingLog) {
-	if len(dataList) <= 0 {
-		return
+		neckRingIds[i] = v.Id
+		if _, ok := originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)]; ok {
+			if v.ActiveDateType == pasturePb.ActiveTimeType_Twenty_Minutes {
+				originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)].Remain += v.Remain
+				originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)].Intake += v.Intake
+				originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)].Inactive += v.Inactive
+				originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)].Gasp += v.Gasp
+				originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)].Other += v.Other
+				originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)].Active += v.Active
+			}
+			if v.ActiveDateType == pasturePb.ActiveTimeType_Two_Hours {
+				originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)].Remain *= 6
+				originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)].Intake *= 6
+				originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)].Inactive *= 6
+				originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)].Gasp *= 6
+				originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)].Other *= 6
+				originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)].Active *= 6
+			}
+		} else {
+			originalMapData[fmt.Sprintf("%s-%s", v.Imei, v.ActiveDate)] = v
+		}
 	}
 
+	return nil
 }

+ 1 - 1
module/mqtt/interface.go

@@ -24,5 +24,5 @@ type Entry struct {
 }
 
 type Exec interface {
-	Subscribe(ctx context.Context, msg []byte) error
+	Subscribe(ctx context.Context, msg <-chan []byte)
 }

+ 27 - 0
module/mqtt/model.go

@@ -0,0 +1,27 @@
+package mqtt
+
+type Behavior struct {
+	UUID      string `json:"uuid"`
+	ECowId    string `json:"ecowid"`
+	FrameId   int32  `json:"frameid"`
+	High      int32  `json:"High"`
+	Intake    int32  `json:"Intake"`
+	RuMina    int32  `json:"Rumina"`
+	Other     int32  `json:"Other"`
+	Activitys int32  `json:"activitys"`
+	Inactive  int32  `json:"inactive"`
+	Sver      int32  `json:"Sver"`
+	Remain    int32  `json:"Remain"`
+	RFRssi    int32  `json:"RFRssi"`
+	STATUS    int32  `json:"STATUS"`
+	BAT       int32  `json:"BAT"`
+	Imei      string `json:"imei"`
+	Gasp      int32  `json:"gasp"`
+}
+
+type NeckRingWrapper struct {
+	Type     string `json:"type"`
+	NeckRing struct {
+		NeckPck []*Behavior `json:"neck"` // neck_pck neck
+	} `json:"NeckRing"`
+}

+ 170 - 1
module/mqtt/mqtt.go

@@ -2,9 +2,178 @@ package mqtt
 
 import (
 	"context"
+	"encoding/json"
+	"kpt-pasture/model"
+	"kpt-pasture/util"
+	"os"
+	"os/signal"
+	"strconv"
+	"sync"
+	"syscall"
+	"time"
+
+	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
+
+	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
+	"gitee.com/xuyiping_admin/pkg/xerr"
+	"go.uber.org/zap"
+	"gorm.io/gorm"
 )
 
-func (s *Entry) Subscribe(ctx context.Context, msg []byte) error {
+type DataInsertNeckRingLog struct {
+	NeckRingOriginalData   []*model.NeckRingOriginal
+	NeckRingErrorData      []*model.NeckRingOriginal
+	NeckRingUnRegisterData []*model.NeckRingOriginal
+	Mx                     *sync.RWMutex
+}
+
+var FrameId = map[int32]int32{
+	1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 8: 8,
+	11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 18: 18,
+	21: 21, 22: 22, 23: 23, 24: 24, 25: 25, 26: 26, 28: 28,
+	31: 31, 32: 32, 33: 33, 34: 34, 35: 35, 36: 36, 38: 38,
+	41: 41, 42: 42, 43: 43, 44: 44, 45: 45, 46: 46, 48: 48,
+	51: 51, 52: 52, 53: 53, 54: 54, 55: 55, 56: 56, 58: 58,
+	61: 61, 62: 62, 63: 63, 64: 64, 65: 65, 66: 66, 68: 68,
+	71: 71, 72: 72, 73: 73, 74: 74, 75: 75, 76: 76, 78: 78,
+	81: 81, 82: 82, 83: 83, 84: 84, 85: 85, 86: 86, 88: 88,
+	91: 91, 92: 92, 93: 93, 94: 94, 95: 95, 96: 96, 98: 98,
+	101: 101, 102: 102, 103: 103, 104: 104, 105: 105, 106: 106, 108: 108,
+	111: 111, 112: 112, 113: 113, 114: 114, 115: 115, 116: 116, 118: 118,
+}
+
+func (e *Entry) Subscribe(ctx context.Context, msg <-chan []byte) {
+	DSMLog := &DataInsertNeckRingLog{
+		NeckRingOriginalData:   make([]*model.NeckRingOriginal, 0),
+		NeckRingErrorData:      make([]*model.NeckRingOriginal, 0),
+		NeckRingUnRegisterData: make([]*model.NeckRingOriginal, 0),
+		Mx:                     &sync.RWMutex{},
+	}
+
+	batchSize := 20
+	batchList := make([]*model.NeckRingOriginal, 0, batchSize)
+
+	sc := make(chan os.Signal, 1)
+	signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+
+	// 设置5分钟超时
+	tc := time.After(5 * time.Minute)
+
+	for {
+		select {
+		case data := <-msg:
+			newData, err := e.MsgDataFormat(data)
+			if err != nil {
+				continue
+			}
+			batchList = append(batchList, newData...)
+			if len(batchList) >= batchSize {
+				DSMLog.Mx.Lock()
+				for _, batch := range batchList {
+					// 异常脖环数据
+					if _, ok := FrameId[batch.FrameId]; !ok {
+						DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, batch)
+						continue
+					}
 
+					// 未佩戴的脖环数据
+					if ok := e.NeckRingIsBind(batch.Imei); !ok {
+						DSMLog.NeckRingUnRegisterData = append(DSMLog.NeckRingUnRegisterData, batch)
+						continue
+					}
+					// 正常脖环数据
+					DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, batch)
+				}
+
+				if err = e.CreatedData(DSMLog); err != nil {
+					zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
+				}
+
+				DSMLog.Mx.Unlock()
+				DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
+				DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0)
+				DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0)
+				batchList = batchList[:0]
+			}
+			// 优雅退出
+		case <-sc:
+			if err := e.CreatedData(DSMLog); err != nil {
+				zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
+			}
+		case <-tc:
+			if err := e.CreatedData(DSMLog); err != nil {
+				zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
+			}
+			zaplog.Info("subMsgChan-os", zap.Any("success", DSMLog.NeckRingOriginalData))
+			DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0)
+			DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0)
+			DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
+		}
+	}
+}
+
+func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
+	if err := e.DB.Transaction(func(tx *gorm.DB) error {
+		if len(DSMLog.NeckRingUnRegisterData) > 0 {
+			if err := e.DB.Create(DSMLog.NeckRingErrorData).Error; err != nil {
+				return xerr.WithStack(err)
+			}
+		}
+
+		if len(DSMLog.NeckRingOriginalData) > 0 {
+			if err := e.DB.Create(DSMLog.NeckRingOriginalData).Error; err != nil {
+				return xerr.WithStack(err)
+			}
+		}
+
+		if len(DSMLog.NeckRingUnRegisterData) > 0 {
+			if err := e.DB.Create(DSMLog.NeckRingUnRegisterData).Error; err != nil {
+				return xerr.WithStack(err)
+			}
+		}
+		return nil
+	}); err != nil {
+		return xerr.WithStack(err)
+	}
 	return nil
 }
+
+func (e *Entry) MsgDataFormat(msg []byte) ([]*model.NeckRingOriginal, error) {
+	neckLog := &NeckRingWrapper{}
+	if err := json.Unmarshal(msg, neckLog); err != nil {
+		zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
+	}
+
+	batchList := make([]*model.NeckRingOriginal, 0)
+	if len(neckLog.NeckRing.NeckPck) > 0 {
+		for _, v := range neckLog.NeckRing.NeckPck {
+			// 存储到数据库
+			activeDate, hours := util.GetNeckRingActiveTimer(v.FrameId)
+			voltage, _ := strconv.ParseInt(strconv.FormatInt(int64(v.BAT), 16), 10, 64)
+			activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
+			if v.FrameId%10 == 8 {
+				activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
+			}
+			newData := &model.NeckRingOriginal{
+				Uuid:           v.UUID,
+				Imei:           v.ECowId,
+				ActiveDate:     activeDate,
+				Hours:          int32(hours),
+				FrameId:        v.FrameId,
+				Rumina:         v.RuMina,
+				Intake:         v.Intake,
+				Inactive:       v.Inactive,
+				Other:          v.Other,
+				High:           v.Activitys,
+				Active:         v.High,
+				Voltage:        int32(voltage),
+				Version:        v.Sver,
+				Remain:         v.Remain,
+				ReceiveNumber:  v.Imei,
+				ActiveDateType: activeDateTimeType,
+			}
+			batchList = append(batchList, newData)
+		}
+	}
+	return batchList, nil
+}

+ 19 - 0
module/mqtt/sql.go

@@ -0,0 +1,19 @@
+package mqtt
+
+import (
+	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
+)
+
+// NeckRingIsBind 脖环是否绑定
+func (e *Entry) NeckRingIsBind(number string) bool {
+	var count int64 = 0
+	if err := e.DB.Where("number = ?", number).
+		Where("status != ?", pasturePb.NeckRingStatus_Unbind).
+		Count(&count).Error; err != nil {
+		return false
+	}
+	if count > 0 {
+		return true
+	}
+	return false
+}

+ 4 - 10
service/mqtt/sub.go

@@ -3,14 +3,7 @@ package mqtt
 import (
 	"fmt"
 	"kpt-pasture/config"
-	"kpt-pasture/model"
-	"kpt-pasture/util"
-	"strconv"
-	"strings"
 	"sync"
-	"time"
-
-	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 	golangMqtt "github.com/eclipse/paho.mqtt.golang"
@@ -74,7 +67,7 @@ func (d *DataEventEntry) SubMsg(conf config.MqttSetting, client golangMqtt.Clien
 }
 
 func (d *DataEventEntry) ProcessMessages(msg []byte) {
-	neckRingOriginalData, err := d.MsgDataFormat(msg)
+	/*neckRingOriginalData, err := d.MsgDataFormat(msg)
 	if err != nil {
 		zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
 		return
@@ -124,10 +117,10 @@ func (d *DataEventEntry) ProcessMessages(msg []byte) {
 		Updates(map[string]interface{}{
 			"status":       neckRingStatus,
 			"error_reason": errorReason,
-		})
+		})*/
 }
 
-func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.NeckRingOriginalData, error) {
+/*func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.NeckRingOriginalData, error) {
 	msgData := make(map[string]interface{})
 	pairs := strings.Split(util.MsgFormat(string(msg)), " ")
 	for _, pair := range pairs {
@@ -258,3 +251,4 @@ func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.NeckRingOriginalData,
 		IsShow:        pasturePb.IsShow_No,
 	}, nil
 }
+*/

+ 40 - 0
util/util.go

@@ -317,3 +317,43 @@ func MsgFormat(input string) string {
 	// 使用正则表达式替换所有匹配的部分
 	return re.ReplaceAllString(input, ":")
 }
+
+/*
+GetNeckRingActiveTimer
+1. frameId值如果是:1到6代表每天的0点到2点,11-16 代表每天的2点到4点, 21-26 代表每天的4点到6点,31-36代表每天的6点到8点,
+41-46 代表每天的8点到10点,51-56代表每天的10点到12点,61-66代表每天的12-14点,71-76代表每天的14-16点,81-86代表每天的16-18点,
+91-96代表每天的18-20点,101-106代表每天的20-22点,111-116代表每天的22-24点。其中每天数字代表20分钟。如果frameId大于接受时间点,就代表frameId是昨天的。
+2. 如果farmId取值出现8,18,28,3848,58,68,78,88,98,108,118数字分别代表2个小时,从0-2点开始,以此类推。
+帮我根据frameId值,获取对应的时间点
+todo 116和118有问题,需要处理
+*/
+func GetNeckRingActiveTimer(frameId int32) (dateTime string, hours int) {
+	if frameId < 0 || frameId > 118 {
+		return "", 0
+	}
+	nowTime := time.Now()
+	currHour := nowTime.Hour()
+	// 处理2小时的特殊 farmId
+	specialHours := map[int]int{
+		8: 2, 18: 4, 28: 6, 38: 8, 48: 10, 58: 12, 68: 14, 78: 16, 88: 18, 98: 20, 108: 22, 118: 0,
+	}
+	hours, ok := specialHours[int(frameId)]
+	startHour := hours
+	if ok {
+		if hours > currHour {
+			startHour -= 24
+		}
+		dateTime = time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), startHour, 0, 0, 0, nowTime.Location()).Format(Layout)
+		return
+	}
+
+	hours = int(math.Floor(float64(frameId)/10) * 2)
+	units := int(frameId % 10)
+	hours += units / 3
+	startHour = hours
+	if hours > currHour {
+		startHour -= 24
+	}
+	dateTime = time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), startHour, 0, 0, 0, nowTime.Location()).Format(Layout)
+	return
+}

+ 101 - 8
util/util_test.go

@@ -2,7 +2,9 @@ package util
 
 import (
 	"fmt"
+	"math"
 	"testing"
+	"time"
 
 	"github.com/stretchr/testify/assert"
 )
@@ -413,16 +415,107 @@ func TestDaysBetween(t *testing.T) {
 	}
 }
 
-func Test_demo(t *testing.T) {
-	ids := make([]int64, 0)
-	defer func() {
-		fmt.Println(ids)
-	}()
-	for i := 0; i < 10; i++ {
-		ids = append(ids, int64(i))
+func TestGetNeckRingActiveTimer(t *testing.T) {
+	nowTime := time.Now().Format(Layout)
+	tests := struct {
+		frameId  []int32
+		dateTime []string
+		hours    []int32
+	}{
+		frameId: []int32{
+			1, 2, 3, 4, 5, 6, 8,
+			11, 12, 13, 14, 15, 16, 18,
+			21, 22, 23, 24, 25, 26, 28,
+			31, 32, 33, 34, 35, 36, 38,
+			41, 42, 43, 44, 45, 46, 48,
+			51, 52, 53, 54, 55, 56, 58,
+			61, 62, 63, 64, 65, 66, 68,
+			71, 72, 73, 74, 75, 76, 78,
+			81, 82, 83, 84, 85, 86, 88,
+			91, 92, 93, 94, 95, 96, 98,
+			101, 102, 103, 104, 105, 106, 108,
+			111, 112, 113, 114, 115, 116, 118,
+		},
+		hours: []int32{
+			0, 0, 1, 1, 1, 2, 2,
+			2, 2, 3, 3, 3, 4, 4,
+			4, 4, 5, 5, 5, 6, 6,
+			6, 6, 7, 7, 7, 8, 8,
+			8, 8, 9, 9, 9, 10, 10,
+			10, 10, 11, 11, 11, 12, 12,
+			12, 12, 13, 13, 13, 14, 14,
+			14, 14, 15, 15, 15, 16, 16,
+			16, 16, 17, 17, 17, 18, 18,
+			18, 18, 19, 19, 19, 20, 20,
+			20, 20, 21, 21, 21, 22, 22,
+			22, 22, 23, 23, 23, 24, 0,
+		},
+		dateTime: []string{
+			fmt.Sprintf("%s 00:20:00", nowTime), fmt.Sprintf("%s 00:40:00", nowTime), fmt.Sprintf("%s 01:00:00", nowTime),
+			fmt.Sprintf("%s 01:20:00", nowTime), fmt.Sprintf("%s 01:40:00", nowTime), fmt.Sprintf("%s 02:00:00", nowTime),
+			fmt.Sprintf("%s 00:00:00", nowTime),
+			fmt.Sprintf("%s 02:20:00", nowTime), fmt.Sprintf("%s 02:40:00", nowTime), fmt.Sprintf("%s 03:00:00", nowTime),
+			fmt.Sprintf("%s 03:20:00", nowTime), fmt.Sprintf("%s 03:40:00", nowTime), fmt.Sprintf("%s 04:00:00", nowTime),
+			fmt.Sprintf("%s 02:00:00", nowTime),
+			fmt.Sprintf("%s 04:20:00", nowTime), fmt.Sprintf("%s 04:40:00", nowTime), fmt.Sprintf("%s 05:00:00", nowTime),
+			fmt.Sprintf("%s 05:20:00", nowTime), fmt.Sprintf("%s 05:40:00", nowTime), fmt.Sprintf("%s 06:00:00", nowTime),
+			fmt.Sprintf("%s 04:00:00", nowTime),
+			fmt.Sprintf("%s 06:20:00", nowTime), fmt.Sprintf("%s 06:40:00", nowTime), fmt.Sprintf("%s 07:00:00", nowTime),
+			fmt.Sprintf("%s 07:20:00", nowTime), fmt.Sprintf("%s 07:40:00", nowTime), fmt.Sprintf("%s 08:00:00", nowTime),
+			fmt.Sprintf("%s 06:00:00", nowTime),
+			fmt.Sprintf("%s 08:20:00", nowTime), fmt.Sprintf("%s 08:40:00", nowTime), fmt.Sprintf("%s 09:00:00", nowTime),
+			fmt.Sprintf("%s 09:20:00", nowTime), fmt.Sprintf("%s 09:40:00", nowTime), fmt.Sprintf("%s 10:00:00", nowTime),
+			fmt.Sprintf("%s 08:00:00", nowTime),
+			fmt.Sprintf("%s 10:20:00", nowTime), fmt.Sprintf("%s 10:40:00", nowTime), fmt.Sprintf("%s 11:00:00", nowTime),
+			fmt.Sprintf("%s 11:20:00", nowTime), fmt.Sprintf("%s 11:40:00", nowTime), fmt.Sprintf("%s 12:00:00", nowTime),
+			fmt.Sprintf("%s 10:00:00", nowTime),
+			fmt.Sprintf("%s 12:20:00", nowTime), fmt.Sprintf("%s 12:40:00", nowTime), fmt.Sprintf("%s 13:00:00", nowTime),
+			fmt.Sprintf("%s 13:20:00", nowTime), fmt.Sprintf("%s 13:40:00", nowTime), fmt.Sprintf("%s 14:00:00", nowTime),
+			fmt.Sprintf("%s 12:00:00", nowTime),
+			fmt.Sprintf("%s 14:20:00", nowTime), fmt.Sprintf("%s 14:40:00", nowTime), fmt.Sprintf("%s 15:00:00", nowTime),
+			fmt.Sprintf("%s 15:20:00", nowTime), fmt.Sprintf("%s 15:40:00", nowTime), fmt.Sprintf("%s 16:00:00", nowTime),
+			fmt.Sprintf("%s 14:00:00", nowTime),
+			fmt.Sprintf("%s 16:20:00", nowTime), fmt.Sprintf("%s 16:40:00", nowTime), fmt.Sprintf("%s 17:00:00", nowTime),
+			fmt.Sprintf("%s 17:20:00", nowTime), fmt.Sprintf("%s 17:40:00", nowTime), fmt.Sprintf("%s 18:00:00", nowTime),
+			fmt.Sprintf("%s 16:00:00", nowTime),
+			fmt.Sprintf("%s 18:20:00", nowTime), fmt.Sprintf("%s 18:40:00", nowTime), fmt.Sprintf("%s 19:00:00", nowTime),
+			fmt.Sprintf("%s 19:20:00", nowTime), fmt.Sprintf("%s 19:40:00", nowTime), fmt.Sprintf("%s 20:00:00", nowTime),
+			fmt.Sprintf("%s 18:00:00", nowTime),
+			fmt.Sprintf("%s 20:20:00", nowTime), fmt.Sprintf("%s 20:40:00", nowTime), fmt.Sprintf("%s 21:00:00", nowTime),
+			fmt.Sprintf("%s 21:20:00", nowTime), fmt.Sprintf("%s 21:40:00", nowTime), fmt.Sprintf("%s 22:00:00", nowTime),
+			fmt.Sprintf("%s 22:00:00", nowTime),
+			fmt.Sprintf("%s 22:20:00", nowTime), fmt.Sprintf("%s 22:40:00", nowTime), fmt.Sprintf("%s 23:00:00", nowTime),
+			fmt.Sprintf("%s 23:20:00", nowTime), fmt.Sprintf("%s 23:40:00", nowTime), fmt.Sprintf("%s 24:00:00", nowTime),
+			fmt.Sprintf("%s 24:00:00", nowTime),
+		},
+	}
+
+	for i, frameId := range tests.frameId {
+		got, hours := GetNeckRingActiveTimer(frameId)
+		t.Logf("frameId: %d, hours: %d, got :%s", frameId, tests.hours[i], got)
+		//assert.Equal(t, got, tests.dateTime[i])
+		assert.Equal(t, int32(hours), tests.hours[i])
 	}
+}
+
+func Test_demo(t *testing.T) {
+	ids := []int32{1, 2, 3, 4, 5, 6, 11, 12, 13, 14, 15, 16, 21, 22, 23, 24, 25, 26,
+		31, 32, 33, 34, 35, 36, 41, 42, 43, 44, 45, 46, 51, 52, 53, 54, 55, 56,
+		61, 62, 63, 64, 65, 66, 71, 72, 73, 74, 75, 76, 81, 82, 83, 84, 85, 86,
+		91, 92, 93, 94, 95, 96, 101, 102, 103, 104, 105, 106, 111, 112, 113, 114, 115, 116}
+
+	nowTime := time.Now()
+	currHour := nowTime.Hour()
 
 	for _, v := range ids {
-		fmt.Println("===v==", v)
+		hours := int(math.Floor(float64(v)/10) * 2)
+		units := int(v % 10)
+		minutes := units * 20 % 60
+		hours += units / 3
+		if hours > currHour {
+			hours -= 24
+		}
+		activeTime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), hours, minutes, 0, 0, time.Now().Location()).Format(LayoutTime)
+		fmt.Println(v, activeTime)
 	}
 }