Browse Source

event: weight update

Yi 4 months ago
parent
commit
72c03092ca

+ 1 - 1
go.mod

@@ -3,7 +3,7 @@ module kpt-pasture
 go 1.17
 
 require (
-	gitee.com/xuyiping_admin/go_proto v0.0.0-20241128101510-609ac4e83ada
+	gitee.com/xuyiping_admin/go_proto v0.0.0-20241128102506-727966a0f004
 	gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
 	github.com/eclipse/paho.mqtt.golang v1.4.3

+ 2 - 20
go.sum

@@ -36,26 +36,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
 cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
 cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241118130425-0268b6791546 h1:j6ocvgNQljRKDLvXYoAJKBTqONRedAqvZSJNdzfFrMI=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241118130425-0268b6791546/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241121031639-c1bda72e25b5 h1:8UyjUjPax9gkjWtFnpaA1pzM3Dz9cu6Y4CzoAt9791U=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241121031639-c1bda72e25b5/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241122085413-8b760c95f985 h1:cLxh8njy76yUYbWq3LbBYIzrnBrkPp0OM/pBFgEIQ9A=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241122085413-8b760c95f985/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241122090558-77fd1cd65cb5 h1:XCxiDeGzfmmWObxBuqloJMGW+2NsOzJDVWw9NIAMAbE=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241122090558-77fd1cd65cb5/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241122093434-2c4af4bddc59 h1:TxIsS8RgDS3hjM5YjdAvbwzYjD4Wr8nYDuUtj72aVYs=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241122093434-2c4af4bddc59/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241127070814-25f6418ad0c6 h1:ZNPcgtm3q+chmuxmW5XQVPl5yD2E3D8ZzyZWgL7W8RQ=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241127070814-25f6418ad0c6/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241127084727-32ff0826e19a h1:JDS3rqbwK46fLevp0/lnU6iJxMLaeOs1GfZhV9HSxmI=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241127084727-32ff0826e19a/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241127090200-9343aa730c3f h1:iRYyVkDAbWJG56FUqvbuNkMgDU7v+fkCI2iuepaNxEw=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241127090200-9343aa730c3f/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241128094754-04053736c8ab h1:9pYdwjsGBoT8frMz42gDq4muVRkGJuidkaMmLVgpICg=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241128094754-04053736c8ab/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241128101510-609ac4e83ada h1:4rHApLDEK0ueIUf3fpIdM5dXVz3S/uhn2fF7rmecgac=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241128101510-609ac4e83ada/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
+gitee.com/xuyiping_admin/go_proto v0.0.0-20241128102506-727966a0f004 h1:0kHmrqRNqiJuzIGIk+dkAsTd/7iAlPRKPYN5h2aaeTo=
+gitee.com/xuyiping_admin/go_proto v0.0.0-20241128102506-727966a0f004/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
 gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b h1:w05MxH7yqveRlaRbxHhbif5YjPrJFodRPfOjYhXn7Zk=
 gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b/go.mod h1:8tF25X6pE9WkFCczlNAC0K2mrjwKvhhp02I7o0HtDxY=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=

+ 2 - 2
http/route/cow_api.go

@@ -12,7 +12,7 @@ func CowAPI(opts ...func(engine *gin.Engine)) func(s *gin.Engine) {
 			opt(s)
 		}
 		// CowAPI API 组  牛只信息
-		eventRoute := authRouteGroup(s, "/api/v1/search/")
-		eventRoute.POST("/cow/list", cow.List)
+		eventRoute := authRouteGroup(s, "/api/v1/cow/")
+		eventRoute.POST("/list", cow.List)
 	}
 }

+ 3 - 3
http/route/event_api.go

@@ -47,14 +47,14 @@ func EventAPI(opts ...func(engine *gin.Engine)) func(s *gin.Engine) {
 
 		// 流产
 		eventRoute.POST("/abortion/list", event.AbortionList)
-		eventRoute.POST("/abortion/create/batch", event.AbortionCreateBatch)
+		eventRoute.POST("/abortion/batch", event.AbortionCreateBatch)
 
 		// 断奶
-		eventRoute.POST("/weaning/create/batch", event.WeaningCreateBatch)
+		eventRoute.POST("/weaning/batch", event.WeaningCreateBatch)
 
 		// 发病
 		eventRoute.POST("/disease/create", event.CowDiseaseCreate)
-		eventRoute.POST("/disease/list", event.CowDiseaseList)
+
 		// 疾病诊断
 		eventRoute.POST("/disease/diagnose", event.CowDiseaseDiagnose)
 		// 疾病治疗

+ 3 - 0
http/route/work.go

@@ -1,6 +1,7 @@
 package route
 
 import (
+	"kpt-pasture/http/handler/event"
 	"kpt-pasture/http/handler/work"
 
 	"github.com/gin-gonic/gin"
@@ -29,5 +30,7 @@ func WorkOrderAPI(opts ...func(engine *gin.Engine)) func(s *gin.Engine) {
 		workRoute.POST("/weaning/items", work.WeaningCowList)
 		workRoute.POST("/mating/items", work.MatingCowList)
 		workRoute.POST("/calving/items", work.CalvingList)
+		workRoute.POST("/disease/items", event.CowDiseaseList)
+
 	}
 }

+ 20 - 19
model/neck_ring_original_data.go

@@ -3,25 +3,26 @@ package model
 import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 
 type NeckRingOriginalData struct {
-	Id         int64                 `json:"id"`
-	SoftVer    int64                 `json:"softVer"`
-	Uuid       string                `json:"uuid"`
-	FrameId    int64                 `json:"frameId"`
-	CowId      string                `json:"cowId"`
-	Csq        int64                 `json:"csq"`
-	Temp       int64                 `json:"temp"`
-	Imei       string                `json:"imei" `
-	Active     int32                 `json:"active"`
-	InActive   int32                 `json:"inactive"`
-	RuMina     int32                 `json:"ruMina"`
-	Intake     int32                 `json:"intake"`
-	Gasp       int32                 `json:"gasp"`
-	Other      int32                 `json:"other"`
-	ReMain     int32                 `json:"remain"`
-	ActiveTime string                `json:"activeTime"`
-	IsShow     pasturePb.IsShow_Kind `json:"isShow"`
-	CreatedAt  int64                 `json:"createdAt"`
-	UpdatedAt  int64                 `json:"updatedAt"`
+	Id            int64                 `json:"id"`
+	SoftVer       int64                 `json:"softVer"`
+	Uuid          string                `json:"uuid"`
+	OriginFrameId int32                 `json:"originFrameId"`
+	FrameId       int32                 `json:"frameId"`
+	CowId         string                `json:"cowId"`
+	Csq           int64                 `json:"csq"`
+	Temp          int64                 `json:"temp"`
+	Imei          string                `json:"imei" `
+	Active        int32                 `json:"active"`
+	InActive      int32                 `json:"inactive"`
+	RuMina        int32                 `json:"ruMina"`
+	Intake        int32                 `json:"intake"`
+	Gasp          int32                 `json:"gasp"`
+	Other         int32                 `json:"other"`
+	ReMain        int32                 `json:"remain"`
+	ActiveTime    string                `json:"activeTime"`
+	IsShow        pasturePb.IsShow_Kind `json:"isShow"`
+	CreatedAt     int64                 `json:"createdAt"`
+	UpdatedAt     int64                 `json:"updatedAt"`
 }
 
 func (s *NeckRingOriginalData) TableName() string {

+ 1 - 1
module/backend/event_base.go

@@ -293,7 +293,7 @@ func (s *StoreEntry) WeightBatch(ctx context.Context, req *pasturePb.EventWeight
 			return xerr.WithStack(err)
 		}
 		var weight = int32(item.Weight * 100)
-		weightEvent = append(weightEvent, model.NewEventWeight(cow, currentUser, weight, int32(item.Height), req))
+		weightEvent = append(weightEvent, model.NewEventWeight(cow, currentUser, weight, item.Height, req))
 	}
 
 	if len(weightEvent) <= 0 {

+ 10 - 10
module/backend/event_health.go

@@ -110,37 +110,37 @@ func (s *StoreEntry) CowDiseaseCreate(ctx context.Context, req *pasturePb.EventC
 func (s *StoreEntry) CowDiseaseList(ctx context.Context, req *pasturePb.SearchEventCowTreatmentRequest, pagination *pasturePb.PaginationModel) (*pasturePb.EventCowDiseaseResponse, error) {
 	cowDiseaseList := make([]*model.EventCowDisease, 0)
 	var count int64 = 0
-	pref := s.DB.Model(new(model.EventCowDisease)).
-		Where("diagnosed_result < ?", pasturePb.IsShow_No)
+	pref := s.DB.Select("a.*,b.name").Table(fmt.Sprintf("%s as a", new(model.EventCowDisease).TableName())).
+		Joins(fmt.Sprintf("JOIN %s AS b on a.pen_id = b.id", new(model.Pen).TableName())).
+		Where("a.diagnosed_result < ?", pasturePb.IsShow_No)
 
 	if len(req.CowIds) > 0 {
-		pref.Where("cow_id IN ?", req.CowIds)
+		pref.Where("a.cow_id IN ?", req.CowIds)
 	}
 
 	if req.DiseaseId > 0 {
-		pref.Where("disease_id = ?", req.DiseaseId)
+		pref.Where("a.disease_id = ?", req.DiseaseId)
 	}
 
 	if req.PenId > 0 {
-		pref.Where("pen_id = ?", req.PenId)
+		pref.Where("a.pen_id = ?", req.PenId)
 	}
 
 	if req.Lact > 0 {
-		pref.Where("lact = ?", req.Lact)
+		pref.Where("a.lact = ?", req.Lact)
 	}
 
 	if req.MinDayAge > 0 && req.MaxDayAge > 0 && req.MaxDayAge >= req.MinDayAge {
-		pref.Where("day_age BETWEEN ? AND ?", req.MinDayAge, req.MaxDayAge)
+		pref.Where("a.day_age BETWEEN ? AND ?", req.MinDayAge, req.MaxDayAge)
 	}
 
 	if req.HealthStatus > 0 {
-		pref.Where("health_status = ?", req.HealthStatus)
+		pref.Where("a.health_status = ?", req.HealthStatus)
 	}
 
-	if err := pref.Order("id desc").
+	if err := pref.Order("a.id desc").
 		Count(&count).Limit(int(pagination.PageSize)).
 		Offset(int(pagination.PageOffset)).
-		Order("id desc").
 		Find(&cowDiseaseList).Error; err != nil {
 		return nil, xerr.WithStack(err)
 	}

+ 1 - 2
module/backend/interface.go

@@ -185,8 +185,6 @@ type EventService interface {
 
 	// CowDiseaseCreate 提交发病牛只
 	CowDiseaseCreate(ctx context.Context, req *pasturePb.EventCowDiseaseRequest) error
-	// CowDiseaseList  发病牛只列表
-	CowDiseaseList(ctx context.Context, req *pasturePb.SearchEventCowTreatmentRequest, pagination *pasturePb.PaginationModel) (*pasturePb.EventCowDiseaseResponse, error)
 	// CowDiseaseDiagnose 诊断
 	CowDiseaseDiagnose(ctx context.Context, req *pasturePb.CowDiagnosedRequest) error
 	// CowDiseaseTreatment 治疗
@@ -265,4 +263,5 @@ type WorkService interface {
 	WeaningCowList(ctx context.Context, req *pasturePb.ItemsRequest, pagination *pasturePb.PaginationModel) (*pasturePb.WeaningItemsResponse, error)
 	MatingCowList(ctx context.Context, req *pasturePb.ItemsRequest, pagination *pasturePb.PaginationModel) (*pasturePb.MatingItemsResponse, error)
 	CalvingCowList(ctx context.Context, req *pasturePb.ItemsRequest, pagination *pasturePb.PaginationModel) (*pasturePb.CalvingItemsResponse, error)
+	CowDiseaseList(ctx context.Context, req *pasturePb.SearchEventCowTreatmentRequest, pagination *pasturePb.PaginationModel) (*pasturePb.EventCowDiseaseResponse, error)
 }

+ 6 - 2
module/crontab/neck_ring.go

@@ -17,7 +17,8 @@ func (e *Entry) NeckRingData() error {
 	neckRingList := make([]*model.NeckRingOriginalData, 0)
 	if err := e.DB.Model(new(model.NeckRingOriginalData)).
 		Where("is_show = ?", pasturePb.IsShow_No).
-		Order("id asc").Limit(10000).Find(&neckRingList).Error; err != nil {
+		Order("id asc").Limit(int(limit)).
+		Find(&neckRingList).Error; err != nil {
 		return xerr.WithStack(err)
 	}
 
@@ -44,6 +45,9 @@ func (e *Entry) NeckRingData() error {
 	return nil
 }
 
-func (e *Entry) NeckRingOriginalDataMerge() {
+func (e *Entry) NeckRingOriginalDataMerge(dataList []*model.NeckRingLog) {
+	if len(dataList) <= 0 {
+		return
+	}
 
 }

+ 2 - 3
service/asynqsvc/asynq.go

@@ -12,8 +12,7 @@ import (
 	"go.uber.org/zap"
 )
 
-var Module = di.Options(
-	di.Provide(NewClient))
+var Module = di.Options(di.Provide(NewClient))
 
 type Server struct {
 	*asynq.Server
@@ -59,7 +58,7 @@ func asynqMiddlewareLog(h asynq.Handler) asynq.Handler {
 		if err := h.ProcessTask(ctx, task); err != nil {
 			return err
 		}
-		zaplog.Info("asynq middleware Process end", zap.Any("task.Type", task.Type()), zap.Any("Time", time.Since(start)))
+		zaplog.Info("asynqMiddlewareLog", zap.Any("task.Type", task.Type()), zap.Any("Time", time.Since(start)))
 		return nil
 	})
 }

+ 1 - 3
service/mqtt/interface.go

@@ -9,9 +9,7 @@ import (
 	"go.uber.org/dig"
 )
 
-var Module = di.Options(
-	di.Provide(NewDataEvent),
-)
+var Module = di.Options(di.Provide(NewDataEvent))
 
 type DataEvent interface {
 	NewMqtt(configOption config.MqttSetting) golangMqtt.Client

+ 17 - 17
service/mqtt/sub.go

@@ -102,7 +102,7 @@ func (d *DataEventEntry) ProcessMessages(msg []byte) {
 	currHour := nowDayTime.Hour() + 2
 	frameIdHour := neckRingOriginalData.FrameId * 2
 	frameDayTime := fmt.Sprintf("%s %s:00:00", nowDayTime.Format(model.LayoutDate2), fmt.Sprintf("%02d", frameIdHour))
-	if frameIdHour > int64(currHour) {
+	if frameIdHour > int32(currHour) {
 		frameDayTime = fmt.Sprintf("%s %s:00:00", nowDayTime.AddDate(0, 0, -1).Format(model.LayoutDate2), fmt.Sprintf("%02d", frameIdHour))
 	}
 
@@ -240,21 +240,21 @@ func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.NeckRingOriginalData,
 	}
 
 	return &model.NeckRingOriginalData{
-		SoftVer:  softVer,
-		Uuid:     uuid,
-		FrameId:  frameId,
-		CowId:    cowId,
-		Csq:      csq,
-		Temp:     int64(temp * 100),
-		Imei:     imei,
-		Active:   int32(active),
-		InActive: int32(inAction),
-		RuMina:   int32(ruMina),
-		Intake:   int32(intake),
-		Gasp:     int32(gasp),
-		Other:    int32(other),
-		ReMain:   int32(reMain),
-		IsShow:   pasturePb.IsShow_No,
+		SoftVer:       softVer,
+		Uuid:          uuid,
+		OriginFrameId: int32(frameId),
+		FrameId:       int32(frameId),
+		CowId:         cowId,
+		Csq:           csq,
+		Temp:          int64(temp * 100),
+		Imei:          imei,
+		Active:        int32(active),
+		InActive:      int32(inAction),
+		RuMina:        int32(ruMina),
+		Intake:        int32(intake),
+		Gasp:          int32(gasp),
+		Other:         int32(other),
+		ReMain:        int32(reMain),
+		IsShow:        pasturePb.IsShow_No,
 	}, nil
-
 }

+ 4 - 22
service/mqtt2/mqtt.go

@@ -10,18 +10,11 @@ import (
 	"go.uber.org/zap"
 )
 
-var Module = di.Options(
-	di.Provide(NewServer),
-)
-
-type Server interface {
-	Subscribe(topic string, qos int32) []byte
-	Publish(topic string, qos int32, retained bool, payload string) error
-}
+var Module = di.Options(di.Provide(NewServer))
 
 type MqttServer struct {
-	client golangMqtt.Client
-	Conf   config.MqttSetting
+	Client golangMqtt.Client
+	Conf   *config.MqttSetting
 }
 
 func NewServer(conf config.MqttSetting) *MqttServer {
@@ -40,7 +33,7 @@ func NewServer(conf config.MqttSetting) *MqttServer {
 		panic(token.Error())
 	}
 
-	return &MqttServer{client: client, Conf: conf}
+	return &MqttServer{Client: client}
 }
 
 var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
@@ -54,14 +47,3 @@ var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client)
 var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
 	zaplog.Info("connectLost", zap.Any("err", err.Error()))
 }
-
-func (s *MqttServer) Subscribe(topic string, qos int32, handler golangMqtt.MessageHandler) {
-	if token := s.client.Subscribe(topic, byte(qos), handler); token.Wait() && token.Error() != nil {
-		panic(token.Error())
-	}
-}
-
-func (s *MqttServer) Publish(topic string, qos int32, retained bool, payload string) {
-	token := s.client.Publish(topic, byte(qos), retained, payload)
-	token.Wait()
-}

+ 33 - 9
service/mqtt2/sub.go

@@ -1,31 +1,55 @@
 package mqtt2
 
 import (
-	"fmt"
+	"kpt-pasture/config"
 	"sync"
 
-	golangMqtt "github.com/eclipse/paho.mqtt.golang"
+	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
+	"go.uber.org/zap"
+
+	mqtt "github.com/eclipse/paho.mqtt.golang"
 
 	"go.uber.org/dig"
 )
 
-type MQTTConsumer struct {
+type MQTTClient struct {
 	dig.In
 
 	MQTTServer *MqttServer
 }
 
+func NewMqttService(consumer *MqttServer) *MQTTClient {
+	return &MQTTClient{MQTTServer: consumer}
+}
+
 var bufferPool = sync.Pool{
 	New: func() interface{} {
 		return make([]byte, 1024) // 根据实际情况调整缓冲区大小
 	},
 }
 
-func (c *MQTTConsumer) Start() {
-
-	handler := func(client golangMqtt.Client, msg golangMqtt.Message) {
-		fmt.Printf("MQTT message received: %s\n", msg.Payload())
-		// 处理消息的逻辑
+// StartConsuming 处理收到的消息
+func (s *MQTTClient) StartConsuming(conf *config.MqttSetting) <-chan []byte {
+	var subMsgChan = make(chan []byte, 2*conf.WorkNumber)
+	defer close(subMsgChan)
+	if token := s.MQTTServer.Client.Subscribe(conf.Topic, byte(conf.Qos), func(client mqtt.Client, msg mqtt.Message) {
+		buffer := bufferPool.Get().([]byte)
+		copy(buffer, msg.Payload())
+		select {
+		case subMsgChan <- buffer[:len(msg.Payload())]:
+			bufferPool.Put(buffer)
+		}
+	}); token.Wait() && token.Error() != nil {
+		close(subMsgChan)
+		zaplog.Error("SubMsg", zap.Any("configOption", conf), zap.Any("err", token.Error()))
+		return subMsgChan
 	}
-	c.MQTTServer.Subscribe(c.MQTTServer.Conf.Topic, int32(c.MQTTServer.Conf.Qos), handler)
+
+	go func() {
+		for msg := range subMsgChan {
+			bufferPool.Put(msg)
+		}
+	}()
+
+	return subMsgChan
 }