Browse Source

mqtt: 更新mqtt代码逻辑

Yi 4 months ago
parent
commit
fbdf522938
6 changed files with 35 additions and 41 deletions
  1. 2 2
      config/app.develop.yaml
  2. 2 2
      config/app.test.yaml
  3. 4 4
      go.mod
  4. 10 10
      go.sum
  5. 9 6
      module/crontab/cow_cron.go
  6. 8 17
      service/mqtt/sub.go

+ 2 - 2
config/app.develop.yaml

@@ -47,8 +47,8 @@ mqtt:
   port: 1883
   username: ""
   password: ""
-  client_id: ""
-  topic: ""
+  client_id: "ping"
+  topic: "a"
   qos: 0
   retain: false
   keep_alive: 60

+ 2 - 2
config/app.test.yaml

@@ -47,8 +47,8 @@ mqtt:
   port: 1883
   username: ""
   password: ""
-  client_id: ""
-  topic: ""
+  client_id: "ping"
+  topic: "a"
   qos: 0
   retain: false
   keep_alive: 60

+ 4 - 4
go.mod

@@ -6,7 +6,7 @@ require (
 	gitee.com/xuyiping_admin/go_proto v0.0.0-20241118130425-0268b6791546
 	gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
-	github.com/eclipse/paho.mqtt.golang v1.5.0
+	github.com/eclipse/paho.mqtt.golang v1.4.3
 	github.com/eko/gocache v1.1.0
 	github.com/getsentry/sentry-go v0.23.0
 	github.com/gin-contrib/cors v1.4.0
@@ -27,7 +27,7 @@ require (
 	github.com/yangxikun/gin-limit-by-key v0.0.0-20190512072151-520697354d5f
 	go.uber.org/dig v1.15.0
 	go.uber.org/zap v1.24.0
-	golang.org/x/sync v0.6.0
+	golang.org/x/sync v0.7.0
 	gorm.io/driver/mysql v1.5.1
 	gorm.io/gorm v1.25.2
 )
@@ -100,9 +100,9 @@ require (
 	github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 // indirect
 	go.uber.org/atomic v1.9.0 // indirect
 	go.uber.org/multierr v1.8.0 // indirect
-	golang.org/x/crypto v0.21.0 // indirect
+	golang.org/x/crypto v0.23.0 // indirect
 	golang.org/x/net v0.25.0 // indirect
-	golang.org/x/sys v0.18.0 // indirect
+	golang.org/x/sys v0.20.0 // indirect
 	golang.org/x/text v0.15.0 // indirect
 	golang.org/x/time v0.3.0
 	google.golang.org/appengine v1.6.8 // indirect

+ 10 - 10
go.sum

@@ -125,8 +125,8 @@ github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:Htrtb
 github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
-github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
-github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
+github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
+github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
 github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
 github.com/eko/gocache v1.1.0 h1:FeER4gxA+lneYNeg/56obO6itD903LhPI0fT38J01WI=
 github.com/eko/gocache v1.1.0/go.mod h1:Q/KMUBMhv7CO4VahJStlTzMfFzP5dxTqs7D34NmJmVM=
@@ -636,8 +636,8 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y
 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
-golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
-golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
+golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
+golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -724,8 +724,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
 golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
 golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
 golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
-golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
-golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
+golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
+golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -817,8 +817,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
-golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
+golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@@ -836,8 +836,8 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
 golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
 golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
 golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
-golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
-golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
+golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
+golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
 golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

+ 9 - 6
module/crontab/cow_cron.go

@@ -199,23 +199,26 @@ func (e *Entry) SameTimePlan() error {
 		}
 
 		cowList := make([]*model.Cow, 0)
-		pref := e.DB.Where("admission_status = ?", pasturePb.AdmissionStatus_Admission)
+		pref := e.DB.Model(new(model.Cow)).
+			Where("admission_status = ?", pasturePb.AdmissionStatus_Admission).
+			Where("sex = ?", pasturePb.Genders_Male)
 
-		if sameTime.CowType == pasturePb.SameTimeCowType_Breeding_Calf {
+		switch sameTime.CowType {
+		case pasturePb.SameTimeCowType_Breeding_Calf:
 			pref.Where("calving_age >= ?", sameTime.PostpartumDaysStart).
 				Where("calving_age <= ?", sameTime.PostpartumDaysEnd).
 				Where("is_pregnant = ?", pasturePb.IsShow_No)
-		}
-
-		if sameTime.CowType == pasturePb.SameTimeCowType_Empty {
+		case pasturePb.SameTimeCowType_Empty:
 			pref.Where(
 				e.DB.Where("breed_status = ?", pasturePb.BreedStatus_Empty).
 					Or("breed_status = ?", pasturePb.BreedStatus_Abort),
 			).Where("is_pregnant = ?", pasturePb.IsShow_No)
+		default:
+			continue
 		}
 
 		if err := pref.Find(&cowList).Error; err != nil {
-			zaplog.Error("crontab", zap.Any("SameTimePlan", err), zap.Any("plan", sameTime))
+			zaplog.Error("crontab", zap.Any("err", err), zap.Any("sameTime", sameTime))
 			return xerr.WithStack(err)
 		}
 

+ 8 - 17
service/mqtt/sub.go

@@ -4,7 +4,6 @@ import (
 	"fmt"
 	"kpt-pasture/config"
 	"sync"
-	"time"
 
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 	golangMqtt "github.com/eclipse/paho.mqtt.golang"
@@ -60,24 +59,16 @@ func (d *DataEventEntry) SubMsg(conf config.MqttSetting, client golangMqtt.Clien
 	}
 
 	defer close(subMsgChan)
-	for i := 0; i < conf.WorkNumber; i++ {
-		go func() {
-			ticker := time.NewTicker(10 * time.Second) // 根据实际情况调整超时时间
-			defer ticker.Stop()
-			for {
-				select {
-				case msg := <-subMsgChan:
-					d.ProcessMessages(msg)
-					bufferPool.Put(msg)
-				case <-ticker.C:
-					zaplog.Info("subMsgChan timeout")
-					return
-				}
-			}
-		}()
+	for {
+		select {
+		case msg := <-subMsgChan:
+			bufferPool.Put(msg)
+			d.ProcessMessages(msg)
+		}
 	}
+
 }
 
 func (d *DataEventEntry) ProcessMessages(msg []byte) {
-
+	fmt.Println("===byte==", string(msg))
 }