Parcourir la source

drone: update

Yi il y a 2 mois
Parent
commit
859ff4b7ce

+ 1 - 1
.drone.yml

@@ -30,7 +30,7 @@ steps:
         from_secret: aliyuncs_password
         from_secret: aliyuncs_password
       repo: registry.cn-hangzhou.aliyuncs.com/kpt-event/kpt-pasture
       repo: registry.cn-hangzhou.aliyuncs.com/kpt-event/kpt-pasture
       registry: registry.cn-hangzhou.aliyuncs.com
       registry: registry.cn-hangzhou.aliyuncs.com
-      tags: [ test ]
+      tags: [ http-test,mqtt-test ]
 
 
 trigger:
 trigger:
   branch:
   branch:

+ 2 - 2
README.md

@@ -41,7 +41,7 @@ todo列表:
 - [x] 所有事件录入梳理【批量录入,excel导入,信息人员与操作人员统一规范】
 - [x] 所有事件录入梳理【批量录入,excel导入,信息人员与操作人员统一规范】
 - [ ] 药品优化成药品名称关联生产商
 - [ ] 药品优化成药品名称关联生产商
 - [x] 框架logrus日志优化【未按照指定天数的日志自动删除,待验证】
 - [x] 框架logrus日志优化【未按照指定天数的日志自动删除,待验证】
-- [ ] 犊牛的牛只品种是根据母牛的品种来确定,还是根据公牛来确定?【目前是根据母牛品种来确定】
+- [x] 犊牛的牛只品种是根据母牛的品种来确定,还是根据公牛来确定?【目前是根据母牛品种来确定】
 
 
 脖环发情算法梳理:
 脖环发情算法梳理:
-- [ ] 处理异常上报数据(frameid > 12)
+- [x] 处理异常上报数据(frameid > 12)

+ 1 - 1
cmd/mqtt.go

@@ -10,6 +10,6 @@ var MqttCmd = &cobra.Command{
 	Use:   "mqtt",
 	Use:   "mqtt",
 	Short: "mqtt server",
 	Short: "mqtt server",
 	Run: func(cmd *cobra.Command, args []string) {
 	Run: func(cmd *cobra.Command, args []string) {
-		dep.DIMqttService().MqttServer.Run()
+		dep.DIMqttService()
 	},
 	},
 }
 }

+ 0 - 2
dep/dep.go

@@ -7,7 +7,6 @@ import (
 	"kpt-pasture/module/crontab"
 	"kpt-pasture/module/crontab"
 	moduleMqtt "kpt-pasture/module/mqtt"
 	moduleMqtt "kpt-pasture/module/mqtt"
 	"kpt-pasture/service/asynqsvc"
 	"kpt-pasture/service/asynqsvc"
-	"kpt-pasture/service/mqtt"
 	"kpt-pasture/service/redis"
 	"kpt-pasture/service/redis"
 	"kpt-pasture/service/sso"
 	"kpt-pasture/service/sso"
 	"kpt-pasture/service/wechat"
 	"kpt-pasture/service/wechat"
@@ -38,7 +37,6 @@ func Options() []di.HubOption {
 		asynq.Module,
 		asynq.Module,
 		redis.Module,
 		redis.Module,
 		crontab.Module,
 		crontab.Module,
-		mqtt.Module,
 		moduleMqtt.Module,
 		moduleMqtt.Module,
 	}
 	}
 }
 }

+ 16 - 4
dep/di_mqtt.go

@@ -1,22 +1,34 @@
 package dep
 package dep
 
 
 import (
 import (
+	"kpt-pasture/config"
+	mqttHandle "kpt-pasture/module/mqtt"
 	"kpt-pasture/service/mqtt"
 	"kpt-pasture/service/mqtt"
 
 
 	"go.uber.org/dig"
 	"go.uber.org/dig"
 )
 )
 
 
-func DIMqttService() (out MqttDependency) {
+func DIMqttService() (out mqtt.IMqttServer) {
 	container := DI()
 	container := DI()
-	if err := container.Invoke(func(c MqttDependency) { out = c }); err != nil {
+	if err := container.Provide(MqttHandel); err != nil {
+		panic(err)
+	}
+	if err := container.Invoke(func(c mqtt.IMqttServer) { out = c }); err != nil {
 		panic(err)
 		panic(err)
 	}
 	}
 	return
 	return
 }
 }
 
 
-// MqttDependency 依赖注入结构体
+// MqttHandel 相关消费
+func MqttHandel(dep MqttDependency) mqtt.IMqttServer {
+	cfg := config.Options()
+	sev := mqtt.NewServer(cfg)
+	sev.Run(dep.MqttHub)
+	return sev
+}
+
 type MqttDependency struct {
 type MqttDependency struct {
 	dig.In
 	dig.In
 
 
-	MqttServer mqtt.IMqttServer
+	MqttHub mqttHandle.Entry // 处理数据
 }
 }

+ 1 - 1
go.mod

@@ -3,7 +3,7 @@ module kpt-pasture
 go 1.17
 go 1.17
 
 
 require (
 require (
-	gitee.com/xuyiping_admin/go_proto v0.0.0-20250103030611-2b0c271353c7
+	gitee.com/xuyiping_admin/go_proto v0.0.0-20250103092939-8816c3aabc06
 	gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b
 	gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
 	github.com/eclipse/paho.mqtt.golang v1.4.3
 	github.com/eclipse/paho.mqtt.golang v1.4.3

+ 2 - 0
go.sum

@@ -110,6 +110,8 @@ gitee.com/xuyiping_admin/go_proto v0.0.0-20250102080429-bbf8aeeb608c h1:Fqi07Tnc
 gitee.com/xuyiping_admin/go_proto v0.0.0-20250102080429-bbf8aeeb608c/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
 gitee.com/xuyiping_admin/go_proto v0.0.0-20250102080429-bbf8aeeb608c/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
 gitee.com/xuyiping_admin/go_proto v0.0.0-20250103030611-2b0c271353c7 h1:mk2G7/x2g2WLbCFw7ThZ7F5HOJ9AfAidG7ilhit67bM=
 gitee.com/xuyiping_admin/go_proto v0.0.0-20250103030611-2b0c271353c7 h1:mk2G7/x2g2WLbCFw7ThZ7F5HOJ9AfAidG7ilhit67bM=
 gitee.com/xuyiping_admin/go_proto v0.0.0-20250103030611-2b0c271353c7/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
 gitee.com/xuyiping_admin/go_proto v0.0.0-20250103030611-2b0c271353c7/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
+gitee.com/xuyiping_admin/go_proto v0.0.0-20250103092939-8816c3aabc06 h1:9pYtEuYYvFb1Zak/bMe7MFzElFlwya45G7mrco5bNp0=
+gitee.com/xuyiping_admin/go_proto v0.0.0-20250103092939-8816c3aabc06/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
 gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b h1:w05MxH7yqveRlaRbxHhbif5YjPrJFodRPfOjYhXn7Zk=
 gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b h1:w05MxH7yqveRlaRbxHhbif5YjPrJFodRPfOjYhXn7Zk=
 gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b/go.mod h1:8tF25X6pE9WkFCczlNAC0K2mrjwKvhhp02I7o0HtDxY=
 gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b/go.mod h1:8tF25X6pE9WkFCczlNAC0K2mrjwKvhhp02I7o0HtDxY=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=

+ 1 - 1
http/handler/event/event_breed.go

@@ -211,7 +211,7 @@ func EstrusBatchMating(c *gin.Context) {
 		valid.Field(&req.CowIds, valid.Required),
 		valid.Field(&req.CowIds, valid.Required),
 		valid.Field(&req.BullNumber, valid.Required),
 		valid.Field(&req.BullNumber, valid.Required),
 		valid.Field(&req.OperationId, valid.Required),
 		valid.Field(&req.OperationId, valid.Required),
-		valid.Field(&req.Consumption, valid.Required),
+		valid.Field(&req.Quantity, valid.Required),
 	); err != nil {
 	); err != nil {
 		apierr.AbortBadRequest(c, http.StatusBadRequest, err)
 		apierr.AbortBadRequest(c, http.StatusBadRequest, err)
 		return
 		return

+ 2 - 0
model/event_mating.go

@@ -14,6 +14,7 @@ type EventMating struct {
 	DayAge            int32                           `json:"dayAge"`
 	DayAge            int32                           `json:"dayAge"`
 	Lact              int32                           `json:"lact"`
 	Lact              int32                           `json:"lact"`
 	PenId             int32                           `json:"penId"`
 	PenId             int32                           `json:"penId"`
+	PenName           string                          `json:"penName"`
 	CowType           pasturePb.CowType_Kind          `json:"cowType"`
 	CowType           pasturePb.CowType_Kind          `json:"cowType"`
 	CowKind           pasturePb.CowKind_Kind          `json:"cowKind"`
 	CowKind           pasturePb.CowKind_Kind          `json:"cowKind"`
 	CalvingAge        int32                           `json:"calvingAge"`
 	CalvingAge        int32                           `json:"calvingAge"`
@@ -89,6 +90,7 @@ func NewEventMating(pastureId int64, cow *Cow, planDay int64, exposeEstrusType p
 		CowId:            cow.Id,
 		CowId:            cow.Id,
 		Lact:             cow.Lact,
 		Lact:             cow.Lact,
 		PenId:            cow.PenId,
 		PenId:            cow.PenId,
+		PenName:          cow.PenName,
 		CowType:          cow.CowType,
 		CowType:          cow.CowType,
 		CowKind:          cow.CowKind,
 		CowKind:          cow.CowKind,
 		CalvingAt:        cow.LastMatingAt,
 		CalvingAt:        cow.LastMatingAt,

+ 3 - 1
model/frozen_semen_log.go

@@ -9,6 +9,7 @@ import (
 
 
 type FrozenSemenLog struct {
 type FrozenSemenLog struct {
 	Id            int64  `json:"id"`
 	Id            int64  `json:"id"`
+	PastureId     int64  `json:"pastureId"`
 	BullId        string `json:"bullId"`
 	BullId        string `json:"bullId"`
 	CowIds        string `json:"cowId"`
 	CowIds        string `json:"cowId"`
 	Quantity      int32  `json:"quantity"`
 	Quantity      int32  `json:"quantity"`
@@ -24,7 +25,7 @@ func (e *FrozenSemenLog) TableName() string {
 	return "frozen_semen_log"
 	return "frozen_semen_log"
 }
 }
 
 
-func NewEventFrozenSemenLog(req *pasturePb.EventMating) *FrozenSemenLog {
+func NewEventFrozenSemenLog(pastureId int64, req *pasturePb.EventMating) *FrozenSemenLog {
 	cowIds := ""
 	cowIds := ""
 	for _, v := range req.CowIds {
 	for _, v := range req.CowIds {
 		cowIds += fmt.Sprintf("%d,", v)
 		cowIds += fmt.Sprintf("%d,", v)
@@ -33,6 +34,7 @@ func NewEventFrozenSemenLog(req *pasturePb.EventMating) *FrozenSemenLog {
 		cowIds = strings.TrimRight(cowIds, ",")
 		cowIds = strings.TrimRight(cowIds, ",")
 	}
 	}
 	return &FrozenSemenLog{
 	return &FrozenSemenLog{
+		PastureId:     pastureId,
 		BullId:        req.FrozenSemenNumber,
 		BullId:        req.FrozenSemenNumber,
 		CowIds:        cowIds,
 		CowIds:        cowIds,
 		OperationId:   int64(req.OperationId),
 		OperationId:   int64(req.OperationId),

+ 2 - 2
module/backend/event_breed.go

@@ -421,7 +421,7 @@ func (s *StoreEntry) EstrusBatchMating(ctx context.Context, req *pasturePb.Event
 	eventMating := &pasturePb.EventMating{
 	eventMating := &pasturePb.EventMating{
 		CowIds:            cowIds,
 		CowIds:            cowIds,
 		FrozenSemenNumber: req.BullNumber,
 		FrozenSemenNumber: req.BullNumber,
-		FrozenSemenCount:  req.Consumption,
+		FrozenSemenCount:  req.Quantity,
 		OperationId:       req.OperationId,
 		OperationId:       req.OperationId,
 		Remarks:           req.Remarks,
 		Remarks:           req.Remarks,
 		MatingAt:          req.MatingAt,
 		MatingAt:          req.MatingAt,
@@ -555,7 +555,7 @@ func (s *StoreEntry) MatingCreate(ctx context.Context, req *pasturePb.EventMatin
 		}
 		}
 
 
 		// 创建冻精使用记录日志
 		// 创建冻精使用记录日志
-		itemFrozenSemenLog := model.NewEventFrozenSemenLog(req)
+		itemFrozenSemenLog := model.NewEventFrozenSemenLog(eventCheckModel.FrozenSemen.PastureId, req)
 		if err = tx.Create(itemFrozenSemenLog).Error; err != nil {
 		if err = tx.Create(itemFrozenSemenLog).Error; err != nil {
 			return xerr.WithStack(err)
 			return xerr.WithStack(err)
 		}
 		}

+ 1 - 3
module/crontab/interface.go

@@ -10,9 +10,7 @@ import (
 	"go.uber.org/dig"
 	"go.uber.org/dig"
 )
 )
 
 
-var Module = di.Options(
+var Module = di.Options(di.Provide(NewCrontab))
-	di.Provide(NewCrontab),
-)
 
 
 type Entry struct {
 type Entry struct {
 	dig.In
 	dig.In

+ 39 - 1
module/mqtt/handle.go

@@ -1,12 +1,15 @@
 package mqtt
 package mqtt
 
 
 import (
 import (
+	"encoding/json"
 	"kpt-pasture/model"
 	"kpt-pasture/model"
 	"kpt-pasture/util"
 	"kpt-pasture/util"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
 
 
+	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
+
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 	"gitee.com/xuyiping_admin/pkg/xerr"
 	"gitee.com/xuyiping_admin/pkg/xerr"
 	"go.uber.org/zap"
 	"go.uber.org/zap"
@@ -82,7 +85,7 @@ func (e *Entry) NeckRingHandle(data []byte) {
 
 
 func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
 func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
 	if err := e.DB.Transaction(func(tx *gorm.DB) error {
 	if err := e.DB.Transaction(func(tx *gorm.DB) error {
-		if len(DSMLog.NeckRingUnRegisterData) > 0 {
+		if len(DSMLog.NeckRingErrorData) > 0 {
 			if err := e.DB.Create(DSMLog.NeckRingErrorData).Error; err != nil {
 			if err := e.DB.Create(DSMLog.NeckRingErrorData).Error; err != nil {
 				return xerr.WithStack(err)
 				return xerr.WithStack(err)
 			}
 			}
@@ -280,3 +283,38 @@ func (e *Entry) MsgDataFormat(msg []byte) *model.NeckRingOriginal {
 		Nccid:    nccId,
 		Nccid:    nccId,
 	}
 	}
 }
 }
+
+func (e *Entry) MsgDataFormat2(msg []byte) *model.NeckRingOriginal {
+	neckLog := &Behavior{}
+	if err := json.Unmarshal(msg, neckLog); err != nil {
+		zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
+	}
+	if neckLog.Imei != "" {
+		// 存储到数据库
+		activeDate, hours := util.GetNeckRingActiveTimer(neckLog.FrameId)
+		voltage, _ := strconv.ParseInt(strconv.FormatInt(int64(neckLog.BAT), 16), 10, 64)
+		activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
+		if neckLog.FrameId%10 == 8 {
+			activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
+		}
+		return &model.NeckRingOriginal{
+			Uuid:           neckLog.UUID,
+			Imei:           neckLog.ECowId,
+			ActiveDate:     activeDate,
+			Hours:          int32(hours),
+			FrameId:        neckLog.FrameId,
+			Rumina:         neckLog.RuMina,
+			Intake:         neckLog.Intake,
+			Inactive:       neckLog.Inactive,
+			Other:          neckLog.Other,
+			High:           neckLog.Activitys,
+			Active:         neckLog.High,
+			Voltage:        int32(voltage),
+			Version:        neckLog.Sver,
+			Remain:         neckLog.Remain,
+			ReceiveNumber:  neckLog.Imei,
+			ActiveDateType: activeDateTimeType,
+		}
+	}
+	return nil
+}

+ 27 - 1
module/mqtt/model.go

@@ -1,6 +1,6 @@
 package mqtt
 package mqtt
 
 
-type Behavior struct {
+type NewBehavior struct {
 	SoftVer  int32   `json:"soft_ver"`
 	SoftVer  int32   `json:"soft_ver"`
 	UUID     string  `json:"uuid"`
 	UUID     string  `json:"uuid"`
 	FrameId  int32   `json:"frameid"`
 	FrameId  int32   `json:"frameid"`
@@ -17,3 +17,29 @@ type Behavior struct {
 	Other    int32   `json:"other"`
 	Other    int32   `json:"other"`
 	Remain   int32   `json:"Remain"`
 	Remain   int32   `json:"Remain"`
 }
 }
+
+type Behavior struct {
+	UUID      string `json:"uuid"`
+	ECowId    string `json:"ecowid"`
+	FrameId   int32  `json:"frameid"`
+	High      int32  `json:"High"`
+	Intake    int32  `json:"Intake"`
+	RuMina    int32  `json:"Rumina"`
+	Other     int32  `json:"Other"`
+	Activitys int32  `json:"activitys"`
+	Inactive  int32  `json:"inactive"`
+	Sver      int32  `json:"Sver"`
+	Remain    int32  `json:"Remain"`
+	RFRssi    int32  `json:"RFRssi"`
+	STATUS    int32  `json:"STATUS"`
+	BAT       int32  `json:"BAT"`
+	Imei      string `json:"imei"`
+	Gasp      int32  `json:"gasp"`
+}
+
+type NeckRingWrapper struct {
+	Type     string `json:"type"`
+	NeckRing struct {
+		NeckPck []*Behavior `json:"neck"` // neck_pck neck
+	} `json:"NeckRing"`
+}

+ 6 - 10
service/mqtt/interface.go

@@ -3,35 +3,31 @@ package mqtt
 import (
 import (
 	"fmt"
 	"fmt"
 	"kpt-pasture/config"
 	"kpt-pasture/config"
-	mqtt2 "kpt-pasture/module/mqtt"
 	"kpt-pasture/util"
 	"kpt-pasture/util"
 	"os"
 	"os"
 	"os/signal"
 	"os/signal"
 	"syscall"
 	"syscall"
 	"time"
 	"time"
 
 
-	"go.uber.org/dig"
+	handleMqtt "kpt-pasture/module/mqtt"
 
 
-	"gitee.com/xuyiping_admin/pkg/di"
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 	golangMqtt "github.com/eclipse/paho.mqtt.golang"
 	golangMqtt "github.com/eclipse/paho.mqtt.golang"
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"go.uber.org/zap"
 	"go.uber.org/zap"
 )
 )
 
 
-var Module = di.Options(di.Provide(NewServer))
+//var Module = di.Options(di.Provide(NewServer))
 
 
 type IMqttClient struct {
 type IMqttClient struct {
-	dig.In
 	golangMqtt.Client
 	golangMqtt.Client
-	Config  config.MqttSetting
+	Config config.MqttSetting
-	MqttHub mqtt2.Entry // 处理数据
 }
 }
 
 
 type IMqttServer interface {
 type IMqttServer interface {
 	Consumer(func([]byte))
 	Consumer(func([]byte))
 	Producer(top string, qos int32, data []byte) error
 	Producer(top string, qos int32, data []byte) error
-	Run()
+	Run(handleMqtt.Entry)
 	Close()
 	Close()
 }
 }
 
 
@@ -86,7 +82,7 @@ func (s *IMqttClient) Close() {
 	}
 	}
 }
 }
 
 
-func (s *IMqttClient) Run() {
+func (s *IMqttClient) Run(enter handleMqtt.Entry) {
 	// 设置信号监听以优雅关闭服务器
 	// 设置信号监听以优雅关闭服务器
 	stop := make(chan os.Signal, 1)
 	stop := make(chan os.Signal, 1)
 	signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
 	signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
@@ -114,5 +110,5 @@ func (s *IMqttClient) Run() {
 	}
 	}
 
 
 	// 启动数据处理
 	// 启动数据处理
-	s.Consumer(s.MqttHub.NeckRingHandle)
+	s.Consumer(enter.NeckRingHandle)
 }
 }