Prechádzať zdrojové kódy

mqtt: 集成mqtt服务

Yi 4 mesiacov pred
rodič
commit
10a60f89db
13 zmenil súbory, kde vykonal 229 pridanie a 19 odobranie
  1. 19 0
      cmd/mqtt.go
  2. 1 0
      cmd/root.go
  3. 14 0
      config/app.develop.yaml
  4. 17 0
      config/app.go
  5. 15 0
      config/app.test.yaml
  6. 2 0
      dep/dep.go
  7. 21 0
      dep/di_mqtt.go
  8. 7 3
      go.mod
  9. 16 16
      go.sum
  10. 29 0
      service/mqtt/interface.go
  11. 1 0
      service/mqtt/pub.go
  12. 83 0
      service/mqtt/sub.go
  13. 4 0
      util/util_test.go

+ 19 - 0
cmd/mqtt.go

@@ -0,0 +1,19 @@
+package cmd
+
+import (
+	"kpt-pasture/config"
+	"kpt-pasture/dep"
+
+	"github.com/spf13/cobra"
+)
+
+var MqttCmd = &cobra.Command{
+	Use:   "mqtt",
+	Short: "mqtt server",
+	Run: func(cmd *cobra.Command, args []string) {
+		conf := config.Options().Mqtt
+		dataEvent := dep.DIMqtt()
+		mqttClient := dataEvent.DataEventEntry.NewMqtt(conf)
+		dataEvent.DataEventEntry.SubMsg(conf, mqttClient)
+	},
+}

+ 1 - 0
cmd/root.go

@@ -23,4 +23,5 @@ func init() {
 	RootCmd.AddCommand(JobCmd)
 	RootCmd.AddCommand(CrontabCmd)
 	RootCmd.AddCommand(ConsumerCmd)
+	RootCmd.AddCommand(MqttCmd)
 }

+ 14 - 0
config/app.develop.yaml

@@ -42,3 +42,17 @@ cron:
   update_same_time: "0 20 1 * * ?"
   system_basic_crontab: "0 25 1 * * ?"
   cow_pregnant: "0 00 15 * * ?"
+mqtt:
+  broker: "47.92.95.119"
+  port: 1883
+  username: ""
+  password: ""
+  client_id: ""
+  topic: ""
+  qos: 0
+  retain: false
+  keep_alive: 60
+  connect_timeout: 10
+  auto_reconnect: true
+  reconnect_interval: 10
+  work_number: 1

+ 17 - 0
config/app.go

@@ -42,6 +42,7 @@ type AppConfig struct {
 	// asynq 相关配置
 	SideWorkSetting SideWorkSetting `yaml:"side_work_setting"`
 	CronSetting     CronSetting     `json:"cron_setting" yaml:"cron"`
+	Mqtt            MqttSetting     `json:"mqtt"`
 }
 
 type CronSetting struct {
@@ -154,6 +155,22 @@ type AsynqRedisSetting struct {
 	TLSConfig *tls.Config `json:"tlsConfig,omitempty" yaml:"tls_config"`
 }
 
+type MqttSetting struct {
+	Broker            string `json:"broker" yaml:"broker"`
+	Port              int    `json:"port" yaml:"port"`
+	UserName          string `json:"username" yaml:"username"`
+	Password          string `json:"password" yaml:"password"`
+	ClientId          string `json:"client" yaml:"client_id"`
+	Topic             string `json:"topic"  yaml:"topic"`
+	Retain            bool   `json:"retain" yaml:"retain"`
+	Qos               int    `json:"qos" yaml:"qos"`
+	KeepAlive         int    `json:"keepAlive" yaml:"keep_alive"`
+	ConnectTimeout    int    `json:"connectTimeout" yaml:"connect_timeout"`
+	AutoReconnect     bool   `json:"autoReconnect" yaml:"auto_reconnect"`
+	ReconnectInterval int    `json:"reconnectInterval" yaml:"reconnect_interval"`
+	WorkNumber        int    `json:"workNumber" yaml:"work_number"`
+}
+
 func (a *AppConfig) Name() string {
 	return fmt.Sprintf("%s-%s", a.AppName, a.AppEnv)
 }

+ 15 - 0
config/app.test.yaml

@@ -41,3 +41,18 @@ cron:
   update_same_time: "0 20 1 * * ?"
   system_basic_crontab: "0 25 1 * * ?"
   cow_pregnant: "0 00 15 * * ?"
+
+mqtt:
+  broker: "47.92.95.119"
+  port: 1883
+  username: ""
+  password: ""
+  client_id: ""
+  topic: ""
+  qos: 0
+  retain: false
+  keep_alive: 60
+  connect_timeout: 10
+  auto_reconnect: true
+  reconnect_interval: 10
+  work_number: 1

+ 2 - 0
dep/dep.go

@@ -6,6 +6,7 @@ import (
 	"kpt-pasture/module/consumer"
 	"kpt-pasture/module/crontab"
 	"kpt-pasture/service/asynqsvc"
+	"kpt-pasture/service/mqtt"
 	"kpt-pasture/service/redis"
 	"kpt-pasture/service/sso"
 	"kpt-pasture/service/wechat"
@@ -36,5 +37,6 @@ func Options() []di.HubOption {
 		consumer.Module,
 		redis.Module,
 		crontab.Module,
+		mqtt.Module,
 	}
 }

+ 21 - 0
dep/di_mqtt.go

@@ -0,0 +1,21 @@
+package dep
+
+import (
+	"kpt-pasture/service/mqtt"
+
+	"go.uber.org/dig"
+)
+
+func DIMqtt() (out MqttDependency) {
+	container := DI()
+	if err := container.Invoke(func(c MqttDependency) { out = c }); err != nil {
+		panic(err)
+	}
+	return
+}
+
+type MqttDependency struct {
+	dig.In
+
+	DataEventEntry mqtt.DataEvent
+}

+ 7 - 3
go.mod

@@ -6,6 +6,7 @@ require (
 	gitee.com/xuyiping_admin/go_proto v0.0.0-20241118130425-0268b6791546
 	gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
+	github.com/eclipse/paho.mqtt.golang v1.5.0
 	github.com/eko/gocache v1.1.0
 	github.com/getsentry/sentry-go v0.23.0
 	github.com/gin-contrib/cors v1.4.0
@@ -31,7 +32,11 @@ require (
 	gorm.io/gorm v1.25.2
 )
 
-require github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
+require (
+	github.com/gorilla/websocket v1.5.3 // indirect
+	github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
+	golang.org/x/arch v0.3.0 // indirect
+)
 
 require (
 	github.com/beorn7/perks v1.0.1 // indirect
@@ -95,9 +100,8 @@ require (
 	github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 // indirect
 	go.uber.org/atomic v1.9.0 // indirect
 	go.uber.org/multierr v1.8.0 // indirect
-	golang.org/x/arch v0.5.0 // indirect
 	golang.org/x/crypto v0.21.0 // indirect
-	golang.org/x/net v0.22.0 // indirect
+	golang.org/x/net v0.25.0 // indirect
 	golang.org/x/sys v0.18.0 // indirect
 	golang.org/x/text v0.15.0 // indirect
 	golang.org/x/time v0.3.0

+ 16 - 16
go.sum

@@ -36,8 +36,6 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
 cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
 cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241115062856-467312aafe19 h1:45rzK/vjqSDyvOV/Rkw6T9zTbTTWrb/ONmpa4FUrZMA=
-gitee.com/xuyiping_admin/go_proto v0.0.0-20241115062856-467312aafe19/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
 gitee.com/xuyiping_admin/go_proto v0.0.0-20241118130425-0268b6791546 h1:j6ocvgNQljRKDLvXYoAJKBTqONRedAqvZSJNdzfFrMI=
 gitee.com/xuyiping_admin/go_proto v0.0.0-20241118130425-0268b6791546/go.mod h1:BKrFW6YLDectlQcQk3FYKBeXvjEiodAKJ5rq7O/QiPE=
 gitee.com/xuyiping_admin/pkg v0.0.0-20241108060137-caea58c59f5b h1:w05MxH7yqveRlaRbxHhbif5YjPrJFodRPfOjYhXn7Zk=
@@ -127,6 +125,8 @@ github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:Htrtb
 github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
+github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
+github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
 github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
 github.com/eko/gocache v1.1.0 h1:FeER4gxA+lneYNeg/56obO6itD903LhPI0fT38J01WI=
 github.com/eko/gocache v1.1.0/go.mod h1:Q/KMUBMhv7CO4VahJStlTzMfFzP5dxTqs7D34NmJmVM=
@@ -172,7 +172,6 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb
 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
 github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
-github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
 github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
 github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
 github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs=
@@ -279,6 +278,8 @@ github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51
 github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
 github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
 github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
+github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
+github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
@@ -608,7 +609,6 @@ go.uber.org/dig v1.15.0/go.mod h1:pKHs0wMynzL6brANhB2hLMro+zalv1osARTviTcqHLM=
 go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
 go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
 go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
-go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
 go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
 go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
 go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
@@ -621,8 +621,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
 go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
 go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
 golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
-golang.org/x/arch v0.5.0 h1:jpGode6huXQxcskEIpOCvrU+tzo81b6+oFLUYXWtH/Y=
-golang.org/x/arch v0.5.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
+golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k=
+golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -636,8 +636,8 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y
 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
-golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
-golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
+golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
+golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -724,8 +724,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
 golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
 golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
 golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
-golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
-golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
+golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
+golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -748,8 +748,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
-golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
+golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -817,8 +817,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
-golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
+golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@@ -836,8 +836,8 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
 golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
 golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
 golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
-golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
-golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
+golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
+golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
 golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

+ 29 - 0
service/mqtt/interface.go

@@ -0,0 +1,29 @@
+package mqtt
+
+import (
+	"kpt-pasture/config"
+	"kpt-pasture/store/kptstore"
+
+	"gitee.com/xuyiping_admin/pkg/di"
+	golangMqtt "github.com/eclipse/paho.mqtt.golang"
+	"go.uber.org/dig"
+)
+
+var Module = di.Options(
+	di.Provide(NewDataEvent),
+)
+
+type DataEvent interface {
+	NewMqtt(configOption config.MqttSetting) golangMqtt.Client
+	SubMsg(configOption config.MqttSetting, client golangMqtt.Client)
+}
+
+type DataEventEntry struct {
+	dig.In
+
+	DB *kptstore.DB // DB
+}
+
+func NewDataEvent(entry DataEventEntry) DataEvent {
+	return &entry
+}

+ 1 - 0
service/mqtt/pub.go

@@ -0,0 +1 @@
+package mqtt

+ 83 - 0
service/mqtt/sub.go

@@ -0,0 +1,83 @@
+package mqtt
+
+import (
+	"fmt"
+	"kpt-pasture/config"
+	"sync"
+	"time"
+
+	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
+	golangMqtt "github.com/eclipse/paho.mqtt.golang"
+	"go.uber.org/zap"
+)
+
+var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
+	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
+}
+
+var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
+	zaplog.Info("connectedClient", zap.Any("client", client))
+}
+
+var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
+	zaplog.Info("connectLost", zap.Any("err", err.Error()))
+}
+
+func (d *DataEventEntry) NewMqtt(conf config.MqttSetting) golangMqtt.Client {
+	opts := golangMqtt.NewClientOptions()
+	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
+	opts.SetClientID(conf.ClientId)
+	opts.SetCleanSession(false)
+	opts.SetUsername(conf.UserName)
+	opts.SetPassword(conf.Password)
+	opts.SetAutoReconnect(conf.AutoReconnect)
+	opts.SetDefaultPublishHandler(messagePubHandler)
+	opts.OnConnect = connectHandler
+	opts.OnConnectionLost = connectLostHandler
+	client := golangMqtt.NewClient(opts)
+	if token := client.Connect(); token.Wait() && token.Error() != nil {
+		panic(token.Error())
+	}
+	return client
+}
+
+var bufferPool = sync.Pool{
+	New: func() interface{} {
+		return make([]byte, 1024) // 根据实际情况调整缓冲区大小
+	},
+}
+
+func (d *DataEventEntry) SubMsg(conf config.MqttSetting, client golangMqtt.Client) {
+	var subMsgChan = make(chan []byte, 2*conf.WorkNumber)
+	if token := client.Subscribe(conf.Topic, byte(conf.Qos), func(client golangMqtt.Client, msg golangMqtt.Message) {
+		buffer := bufferPool.Get().([]byte)
+		copy(buffer, msg.Payload())
+		subMsgChan <- buffer[:len(msg.Payload())]
+	}); token.Wait() && token.Error() != nil {
+		close(subMsgChan)
+		zaplog.Error("SubMsg", zap.Any("configOption", conf), zap.Any("err", token.Error()))
+		return
+	}
+
+	defer close(subMsgChan)
+	for i := 0; i < conf.WorkNumber; i++ {
+		go func() {
+			ticker := time.NewTicker(10 * time.Second) // 根据实际情况调整超时时间
+			defer ticker.Stop()
+			for {
+				select {
+				case msg := <-subMsgChan:
+					d.ProcessMessages(msg)
+					bufferPool.Put(msg)
+				case <-ticker.C:
+					zaplog.Info("subMsgChan timeout")
+					return
+				}
+			}
+		}()
+	}
+}
+
+func (d *DataEventEntry) ProcessMessages(msg []byte) {
+
+}

+ 4 - 0
util/util_test.go

@@ -411,3 +411,7 @@ func TestDaysBetween(t *testing.T) {
 		assert.Equal(t, got, v.got)
 	}
 }
+
+func Test_demo(t *testing.T) {
+
+}