Prechádzať zdrojové kódy

docker: add mqtt server

Yi 2 mesiacov pred
rodič
commit
5738d97180

+ 13 - 13
docker-compose.yml

@@ -29,16 +29,16 @@ services:
       - APP_ENVIRONMENT=production
       - PASTURE_WORK_DIR=/app/kpt-pasture
     command: [ "/app/kpt-pasture/kptPasture","http" ]
-  #kpt-pasture-consumer:
-    #privileged: true
-    #container_name: xdmy001_kpt_pasture_consumer
-    #restart: always
-    #image: registry.cn-hangzhou.aliyuncs.com/kpt-event/kpt-pasture:test
-      #volumes:
-      #- /var/logger/kpt-pasture/:/app/kpt-pasture/logger
-      #- /etc/localtime:/etc/localtime
-      #- /data/docker-compose/kpt-pasture/config:/app/kpt-pasture/config
-      #environment:
-      #- APP_ENVIRONMENT=production
-      #- PASTURE_WORK_DIR=/app/kpt-pasture
-    #command: [ "/app/kpt-pasture/kptPasture","consumer" ]
+  kpt-pasture-mqtt:
+    privileged: true
+    container_name: xdmy001_kpt_pasture_mqtt
+    restart: always
+    image: registry.cn-hangzhou.aliyuncs.com/kpt-event/kpt-pasture:test
+    volumes:
+      - /var/logger/kpt-pasture/:/app/kpt-pasture/logger
+      - /etc/localtime:/etc/localtime
+      - /data/docker-compose/kpt-pasture/config:/app/kpt-pasture/config
+    environment:
+      - APP_ENVIRONMENT=production
+      - PASTURE_WORK_DIR=/app/kpt-pasture
+    command: [ "/app/kpt-pasture/kptPasture","mqtt" ]

+ 5 - 0
model/cow.go

@@ -100,6 +100,11 @@ func (c *Cow) EventWeightUpdate(weight int64, weightAt int64) {
 	c.CurrentWeight = weight
 }
 
+// EventDiseaseUpdate 疾病更新
+func (c *Cow) EventDiseaseUpdate(healthStatus pasturePb.HealthStatus_Kind) {
+	c.HealthStatus = healthStatus
+}
+
 type CowSlice []*Cow
 
 func (c CowSlice) ToPB(

+ 37 - 1
model/event_cow_disease.go

@@ -1,6 +1,10 @@
 package model
 
-import pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
+import (
+	"time"
+
+	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
+)
 
 type EventCowDisease struct {
 	Id                    int64                       `json:"id"`
@@ -37,6 +41,38 @@ func (e *EventCowDisease) TableName() string {
 	return "event_cow_disease"
 }
 
+// EventUnDiseaseUpdate 诊断未生病
+func (e *EventCowDisease) EventUnDiseaseUpdate(operation *SystemUser, remarks string) {
+	e.DiagnosedResult = pasturePb.IsShow_No
+	e.DiagnoseOperationId = int32(operation.Id)
+	e.DiagnoseOperationName = operation.Name
+	e.Remarks = remarks
+}
+
+// EventDiseaseUpdate 诊断有生病
+func (e *EventCowDisease) EventDiseaseUpdate(disease *Disease, operationUser *SystemUser, temp float32) {
+	e.HealthStatus = pasturePb.HealthStatus_Disease
+	e.DiagnosedResult = pasturePb.IsShow_Ok
+	e.DiagnoseId = int32(disease.Id)
+	e.DiagnoseName = disease.Name
+	e.Temperature = int32(temp * 100)
+	e.DiagnoseOperationId = int32(operationUser.Id)
+	e.DiagnoseOperationName = operationUser.Name
+	e.DiseaseAt = time.Now().Unix()
+}
+
+// EventHealthStatusUpdate 改变健康状态
+func (e *EventCowDisease) EventHealthStatusUpdate(healthStatus pasturePb.HealthStatus_Kind) {
+	e.HealthStatus = healthStatus
+}
+
+// EventCurableUpdate 治愈
+func (e *EventCowDisease) EventCurableUpdate(cureAt int64) {
+	e.HealthStatus = pasturePb.HealthStatus_Curable
+	e.CurableAt = cureAt
+	e.DiagnosedResult = pasturePb.IsShow_Ok
+}
+
 func NewEventCowDisease(cow *Cow, disease *Disease, req *pasturePb.EventCowDiseaseRequest, operation, currUser *SystemUser) *EventCowDisease {
 	return &EventCowDisease{
 		CowId:           cow.Id,

+ 10 - 1
model/event_cow_treatment.go

@@ -23,6 +23,8 @@ type EventCowTreatment struct {
 	TreatmentResult    pasturePb.TreatmentResult_Kind `json:"treatmentResult"`
 	OperationId        int64                          `json:"operationId"`
 	OperationName      string                         `json:"operationName"`
+	MessageId          int64                          `json:"messageId"`
+	MessageName        string                         `json:"messageName"`
 	Remarks            string                         `json:"remarks"`
 	TreatmentAt        int64                          `json:"treatmentAt"`
 	CreatedAt          int64                          `json:"createdAt"`
@@ -33,7 +35,12 @@ func (e *EventCowTreatment) TableName() string {
 	return "event_cow_treatment"
 }
 
-func NewEventCowTreatment(prescription *Prescription, req *pasturePb.CowTreatmentRequest, diseaseTypeMap map[int32]string, operation *SystemUser) *EventCowTreatment {
+func NewEventCowTreatment(
+	prescription *Prescription,
+	req *pasturePb.CowTreatmentRequest,
+	diseaseTypeMap map[int32]string,
+	operation, currentUser *SystemUser,
+) *EventCowTreatment {
 	b, _ := json.Marshal(req.PrescriptionDetail)
 	return &EventCowTreatment{
 		CowId:              int64(req.CowId),
@@ -48,6 +55,8 @@ func NewEventCowTreatment(prescription *Prescription, req *pasturePb.CowTreatmen
 		TreatmentResult:    req.TreatmentResult,
 		OperationId:        operation.Id,
 		OperationName:      operation.Name,
+		MessageId:          currentUser.Id,
+		MessageName:        currentUser.Name,
 		Remarks:            req.Remarks,
 	}
 }

+ 4 - 0
model/prescription.go

@@ -27,6 +27,10 @@ func (p *Prescription) TableName() string {
 	return "prescription"
 }
 
+func (p *Prescription) EventUseCountUpdate() {
+	p.UseCount += 1
+}
+
 func NewPrescription(req *pasturePb.PrescriptionRequest, applicableDisease string, useDays,
 	meatExpiredDays, milkExpiredDays int32, systemUser *SystemUser) *Prescription {
 	return &Prescription{

+ 94 - 78
module/backend/event_health.go

@@ -8,6 +8,9 @@ import (
 	"strings"
 	"time"
 
+	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
+	"go.uber.org/zap"
+
 	"gorm.io/gorm"
 
 	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
@@ -19,17 +22,17 @@ func (s *StoreEntry) CowDiseaseCreate(ctx context.Context, req *pasturePb.EventC
 	// 牛只信息
 	cow, err := s.GetCowInfoByCowId(ctx, int64(req.CowId))
 	if err != nil {
-		return xerr.WithStack(err)
+		return xerr.Customf("牛只信息错误: %d", req.CowId)
 	}
 
 	operationUser, err := s.GetSystemUserById(ctx, int64(req.OperationId))
 	if err != nil {
-		return xerr.WithStack(err)
+		return xerr.Customf("请检查操作人信息")
 	}
 
 	currUser, err := s.GetCurrentSystemUser(ctx)
 	if err != nil {
-		return xerr.Custom("登录信息错误")
+		return xerr.Custom("登录信息错误,请退出重新登录")
 	}
 
 	disease, err := s.GetDiseaseById(ctx, req.DiseaseId)
@@ -140,7 +143,7 @@ func (s *StoreEntry) CowDiseaseCreate(ctx context.Context, req *pasturePb.EventC
 				TreatmentAt:        req.DiseaseAt,
 			}
 
-			newEventCowTreatment = model.NewEventCowTreatment(prescription, newCowTreatmentRequest, diseaseTypeMap, operationUser)
+			newEventCowTreatment = model.NewEventCowTreatment(prescription, newCowTreatmentRequest, diseaseTypeMap, operationUser, currUser)
 			// 创建治疗记录
 			if err = tx.Model(new(model.EventCowTreatment)).Create(newEventCowTreatment).Error; err != nil {
 				return xerr.WithStack(err)
@@ -211,14 +214,16 @@ func (s *StoreEntry) CowDiseaseList(ctx context.Context, req *pasturePb.SearchEv
 
 // CowDiseaseDiagnose 发病牛只诊断
 func (s *StoreEntry) CowDiseaseDiagnose(ctx context.Context, req *pasturePb.CowDiagnosedRequest) error {
+	cow, err := s.GetCowInfoByCowId(ctx, int64(req.CowId))
+	if err != nil {
+		return xerr.Customf("错误的牛只信息: %d", req.CowId)
+	}
+
 	eventCowDisease := &model.EventCowDisease{}
-	if err := s.DB.Where("cow_id = ?", req.CowId).
+	if err = s.DB.Where("cow_id = ?", req.CowId).
 		Where("id = ?", req.Id).
+		Where("health_status = ?", pasturePb.HealthStatus_Health).
 		First(eventCowDisease).Error; err != nil {
-		return xerr.WithStack(err)
-	}
-
-	if eventCowDisease.HealthStatus != pasturePb.HealthStatus_Health {
 		return xerr.Custom("异常牛只数据")
 	}
 
@@ -226,17 +231,14 @@ func (s *StoreEntry) CowDiseaseDiagnose(ctx context.Context, req *pasturePb.CowD
 	if err != nil {
 		return xerr.WithStack(err)
 	}
-	// 未发病
+
 	if req.DiagnosedResult == pasturePb.IsShow_No {
+		// 未发病更新
+		eventCowDisease.EventUnDiseaseUpdate(currentUser, req.Remarks)
 		if err = s.DB.Model(eventCowDisease).
+			Select("diagnosed_result,diagnose_operation_id,diagnose_operation_name,remarks").
 			Where("id = ?", req.Id).
-			Where("cow_id = ?", req.CowId).
-			Updates(map[string]interface{}{
-				"diagnosed_result":        pasturePb.IsShow_No,
-				"diagnose_operation_id":   currentUser.Id,
-				"diagnose_operation_name": currentUser.Name,
-				"remarks":                 req.Remarks,
-			}).Error; err != nil {
+			Updates(eventCowDisease).Error; err != nil {
 			return xerr.WithStack(err)
 		}
 		return nil
@@ -253,27 +255,20 @@ func (s *StoreEntry) CowDiseaseDiagnose(ctx context.Context, req *pasturePb.CowD
 	}
 
 	if err = s.DB.Transaction(func(tx *gorm.DB) error {
+		eventCowDisease.EventDiseaseUpdate(disease, systemUser, req.Temperature)
 		if err = tx.Model(eventCowDisease).
+			Select("health_status,diagnosed_result,diagnose_id,diagnose_name,temperature,diagnose_operation_id,diagnose_operation_name,diagnosed_at").
 			Where("id = ?", req.Id).
 			Where("cow_id = ?", req.CowId).
-			Updates(map[string]interface{}{
-				"health_status":           pasturePb.HealthStatus_Disease,
-				"diagnosed_result":        pasturePb.IsShow_Ok,
-				"diagnose_id":             req.DiseaseId,
-				"diagnose_name":           disease.Name,
-				"temperature":             int32(req.Temperature * 100),
-				"diagnose_operation_id":   req.OperationId,
-				"diagnose_operation_name": systemUser.Name,
-				"diagnosed_at":            time.Now().Unix(),
-			}).Error; err != nil {
+			Updates(eventCowDisease).Error; err != nil {
 			return xerr.WithStack(err)
 		}
 
-		if err = tx.Model(new(model.Cow)).
-			Where("id = ?", req.CowId).
-			Updates(map[string]interface{}{
-				"health_status": pasturePb.HealthStatus_Disease,
-			}).Error; err != nil {
+		cow.EventDiseaseUpdate(pasturePb.HealthStatus_Disease)
+		if err = tx.Model(cow).
+			Select("health_status").
+			Where("id = ?", cow.Id).
+			Updates(cow).Error; err != nil {
 			return xerr.WithStack(err)
 		}
 		return nil
@@ -298,12 +293,25 @@ func (s *StoreEntry) CowDiseaseTreatment(ctx context.Context, req *pasturePb.Cow
 		return xerr.Custom("异常牛只数据")
 	}
 
-	systemUser, _ := s.GetSystemUserById(ctx, int64(req.OperationId))
+	systemUser, err := s.GetSystemUserById(ctx, int64(req.OperationId))
+	if err != nil {
+		return xerr.Customf("操作人数据异常: %d", req.OperationId)
+	}
+
 	prescription, err := s.GetPrescriptionById(ctx, req.PrescriptionId)
 	if err != nil {
 		return xerr.WithStack(err)
 	}
 
+	currUser, err := s.GetCurrentSystemUser(ctx)
+	if err != nil {
+		return xerr.WithStack(err)
+	}
+	cow, err := s.GetCowInfoByCowId(ctx, int64(req.CowId))
+	if err != nil {
+		return xerr.Customf("异常牛数据: %d", req.CowId)
+	}
+
 	disease, err := s.GetDiseaseById(ctx, eventCowDisease.DiagnoseId)
 	if err != nil {
 		return xerr.WithStack(err)
@@ -340,34 +348,34 @@ func (s *StoreEntry) CowDiseaseTreatment(ctx context.Context, req *pasturePb.Cow
 		healthStatus = pasturePb.HealthStatus_Dead
 	}
 	diseaseTypeMap := s.DiseaseTypeMap()
-	newEventCowTreatment := model.NewEventCowTreatment(prescription, req, diseaseTypeMap, systemUser)
+	newEventCowTreatment := model.NewEventCowTreatment(prescription, req, diseaseTypeMap, systemUser, currUser)
 	if err = s.DB.Transaction(func(tx *gorm.DB) error {
 		if err = tx.Create(newEventCowTreatment).Error; err != nil {
 			return xerr.WithStack(err)
 		}
 
+		eventCowDisease.EventHealthStatusUpdate(healthStatus)
 		if err = tx.Model(eventCowDisease).
+			Select("health_status").
 			Where("id = ?", req.Id).
 			Where("cow_id = ?", req.CowId).
-			Updates(map[string]interface{}{
-				"health_status": healthStatus,
-			}).Error; err != nil {
+			Updates(eventCowDisease).Error; err != nil {
 			return xerr.WithStack(err)
 		}
 
+		cow.EventDiseaseUpdate(healthStatus)
 		if err = tx.Model(new(model.Cow)).
+			Select("health_status").
 			Where("id = ?", req.CowId).
-			Updates(map[string]interface{}{
-				"health_status": healthStatus,
-			}).Error; err != nil {
+			Updates(cow).Error; err != nil {
 			return xerr.WithStack(err)
 		}
 
-		if err = tx.Model(new(model.Prescription)).
+		prescription.EventUseCountUpdate()
+		if err = tx.Model(prescription).
+			Select("use_count").
 			Where("id = ?", prescription.Id).
-			Updates(map[string]interface{}{
-				"use_count": prescription.UseCount + 1,
-			}).Error; err != nil {
+			Updates(prescription).Error; err != nil {
 			return xerr.WithStack(err)
 		}
 		return nil
@@ -439,7 +447,7 @@ func (s *StoreEntry) CowDiseaseTreatmentDetail(
 	}
 
 	if err := s.DB.Model(new(model.EventCowTreatment)).
-		Select("*").Where("cow_disease_id IN ?", cowDiseaseIds).
+		Where("cow_disease_id IN ?", cowDiseaseIds).
 		Where("cow_id = ?", req.CowId).
 		Group("cow_disease_id").
 		Order("id desc").
@@ -464,56 +472,64 @@ func (s *StoreEntry) CowDiseaseCurable(ctx context.Context, req *pasturePb.Event
 	if err := s.DB.Where("id IN ?", req.Ids).
 		Where("health_status = ?", pasturePb.HealthStatus_Treatment).
 		Find(&eventCowDiseaseList).Error; err != nil {
-		return xerr.WithStack(err)
+		zaplog.Error("GetEventCowDiseaseList", zap.Any("err", err), zap.Any("req", req))
+		return xerr.Custom("异常数据")
 	}
 
 	if len(eventCowDiseaseList) == 0 {
 		return nil
 	}
-	if len(eventCowDiseaseList) != len(req.Ids) {
-		return xerr.New("id 参数错误")
-	}
+
 	operationUser, err := s.GetSystemUserById(ctx, int64(req.OperationId))
 	if err != nil {
-		return xerr.WithStack(err)
+		return xerr.Customf("该用户不存在: %d", req.OperationId)
 	}
 	eventCowTreatmentList := make([]*model.EventCowTreatment, 0)
 	for _, v := range eventCowDiseaseList {
 		eventCowTreatmentList = append(eventCowTreatmentList, &model.EventCowTreatment{
-			CowId:              v.CowId,
-			CowDiseaseId:       v.Id,
-			DiseaseId:          int64(v.DiseaseId),
-			DiseaseName:        v.DiseaseName,
-			PrescriptionId:     0,
-			PrescriptionName:   "",
-			PrescriptionDetail: "",
-			TreatmentResult:    pasturePb.TreatmentResult_Curable,
-			OperationId:        operationUser.Id,
-			OperationName:      operationUser.Name,
-			Remarks:            req.Remarks,
-			TreatmentAt:        int64(req.CurableAt),
+			CowId:           v.CowId,
+			CowDiseaseId:    v.Id,
+			DiseaseId:       int64(v.DiseaseId),
+			DiseaseName:     v.DiseaseName,
+			TreatmentResult: pasturePb.TreatmentResult_Curable,
+			OperationId:     operationUser.Id,
+			OperationName:   operationUser.Name,
+			Remarks:         req.Remarks,
+			TreatmentAt:     int64(req.CurableAt),
 		})
 	}
 
+	if len(eventCowTreatmentList) <= 0 {
+		return nil
+	}
+
 	if err = s.DB.Transaction(func(tx *gorm.DB) error {
-		if err = tx.Model(model.EventCowDisease{}).Where("id IN ?", req.Ids).
-			Where("health_status = ?", pasturePb.HealthStatus_Treatment).
-			Updates(map[string]interface{}{
-				"health_status":    pasturePb.HealthStatus_Curable,
-				"diagnosed_result": pasturePb.IsShow_Ok,
-				"curable_at":       req.CurableAt,
-			}).Error; err != nil {
-			return xerr.WithStack(err)
-		}
-		if err = tx.Model(model.Cow{}).
-			Where("id IN ?", req.Ids).
-			Updates(map[string]interface{}{
-				"health_status": pasturePb.HealthStatus_Curable,
-			}).Error; err != nil {
-			return xerr.WithStack(err)
+		cow := &model.Cow{}
+		for _, eventCowDisease := range eventCowDiseaseList {
+			eventCowDisease.EventCurableUpdate(int64(req.CurableAt))
+			if err = tx.Model(eventCowDisease).
+				Select("health_status,diagnosed_result,curable_at").
+				Where("id = ?", eventCowDisease.Id).
+				Where("health_status = ?", pasturePb.HealthStatus_Treatment).
+				Updates(eventCowDisease).Error; err != nil {
+				return xerr.WithStack(err)
+			}
+
+			cow, err = s.GetCowInfoByCowId(ctx, eventCowDisease.CowId)
+			if err != nil {
+				return xerr.WithStack(err)
+			}
+			// 更新牛只健康状态
+			cow.EventDiseaseUpdate(pasturePb.HealthStatus_Curable)
+			if err = tx.Model(cow).
+				Select("health_status").
+				Where("id = ?", eventCowDisease.CowId).
+				Updates(cow).Error; err != nil {
+				return xerr.WithStack(err)
+			}
 		}
 
-		if err = tx.Model(model.EventCowTreatment{}).Create(eventCowTreatmentList).Error; err != nil {
+		if err = tx.Model(new(model.EventCowTreatment)).Create(eventCowTreatmentList).Error; err != nil {
 			return xerr.WithStack(err)
 		}
 		return nil

+ 31 - 36
module/mqtt/handle.go

@@ -50,9 +50,9 @@ func (e *Entry) NeckRingHandle(data []byte) {
 		Mx:                     &sync.RWMutex{},
 	}
 
-	newData, _ := e.MsgDataFormat(data)
-	if newData != nil && len(newData) > 0 {
-		batchList = append(batchList, newData...)
+	newData := e.MsgDataFormat(data)
+	if newData != nil {
+		batchList = append(batchList, newData)
 		if len(batchList) >= batchSize {
 			DSMLog.Mx.Lock()
 			for _, batch := range batchList {
@@ -108,42 +108,37 @@ func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
 	return nil
 }
 
-func (e *Entry) MsgDataFormat(msg []byte) ([]*model.NeckRingOriginal, error) {
-	neckLog := &NeckRingWrapper{}
+func (e *Entry) MsgDataFormat(msg []byte) *model.NeckRingOriginal {
+	neckLog := &Behavior{}
 	if err := json.Unmarshal(msg, neckLog); err != nil {
 		zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
 	}
-
-	batchList = make([]*model.NeckRingOriginal, 0)
-	if len(neckLog.NeckRing.NeckPck) > 0 {
-		for _, v := range neckLog.NeckRing.NeckPck {
-			// 存储到数据库
-			activeDate, hours := util.GetNeckRingActiveTimer(v.FrameId)
-			voltage, _ := strconv.ParseInt(strconv.FormatInt(int64(v.BAT), 16), 10, 64)
-			activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
-			if v.FrameId%10 == 8 {
-				activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
-			}
-			newData := &model.NeckRingOriginal{
-				Uuid:           v.UUID,
-				Imei:           v.ECowId,
-				ActiveDate:     activeDate,
-				Hours:          int32(hours),
-				FrameId:        v.FrameId,
-				Rumina:         v.RuMina,
-				Intake:         v.Intake,
-				Inactive:       v.Inactive,
-				Other:          v.Other,
-				High:           v.Activitys,
-				Active:         v.High,
-				Voltage:        int32(voltage),
-				Version:        v.Sver,
-				Remain:         v.Remain,
-				ReceiveNumber:  v.Imei,
-				ActiveDateType: activeDateTimeType,
-			}
-			batchList = append(batchList, newData)
+	if neckLog.Imei != "" {
+		// 存储到数据库
+		activeDate, hours := util.GetNeckRingActiveTimer(neckLog.FrameId)
+		voltage, _ := strconv.ParseInt(strconv.FormatInt(int64(neckLog.BAT), 16), 10, 64)
+		activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
+		if neckLog.FrameId%10 == 8 {
+			activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
+		}
+		return &model.NeckRingOriginal{
+			Uuid:           neckLog.UUID,
+			Imei:           neckLog.ECowId,
+			ActiveDate:     activeDate,
+			Hours:          int32(hours),
+			FrameId:        neckLog.FrameId,
+			Rumina:         neckLog.RuMina,
+			Intake:         neckLog.Intake,
+			Inactive:       neckLog.Inactive,
+			Other:          neckLog.Other,
+			High:           neckLog.Activitys,
+			Active:         neckLog.High,
+			Voltage:        int32(voltage),
+			Version:        neckLog.Sver,
+			Remain:         neckLog.Remain,
+			ReceiveNumber:  neckLog.Imei,
+			ActiveDateType: activeDateTimeType,
 		}
 	}
-	return batchList, nil
+	return nil
 }