Browse Source

mqtt: 重新封装

Yi 3 months ago
parent
commit
9540c046df

+ 4 - 2
dep/dep.go

@@ -2,9 +2,10 @@ package dep
 
 import (
 	"kpt-pasture/config"
+	"kpt-pasture/module/asynq"
 	"kpt-pasture/module/backend"
-	"kpt-pasture/module/consumer"
 	"kpt-pasture/module/crontab"
+	moduleMqtt "kpt-pasture/module/mqtt"
 	"kpt-pasture/service/asynqsvc"
 	"kpt-pasture/service/mqtt"
 	"kpt-pasture/service/mqtt2"
@@ -35,10 +36,11 @@ func Options() []di.HubOption {
 		sso.Module,
 		wechat.Module,
 		asynqsvc.Module,
-		consumer.Module,
+		asynq.Module,
 		redis.Module,
 		crontab.Module,
 		mqtt.Module,
 		mqtt2.Module,
+		moduleMqtt.Module,
 	}
 }

+ 2 - 2
dep/di_asynq.go

@@ -4,7 +4,7 @@ import (
 	"fmt"
 	"kpt-pasture/config"
 	"kpt-pasture/model"
-	"kpt-pasture/module/consumer"
+	"kpt-pasture/module/asynq"
 	"kpt-pasture/service/asynqsvc"
 
 	"go.uber.org/dig"
@@ -35,5 +35,5 @@ func AsynqWorkOrder(dep AsyncDependency) *asynqsvc.Server {
 type AsyncDependency struct {
 	dig.In
 
-	WorkOrder consumer.BizExec // BizExec 工单
+	WorkOrder asynq.BizExec // BizExec 工单
 }

+ 19 - 0
dep/di_mqtt.go

@@ -19,3 +19,22 @@ type MqttDependency struct {
 
 	DataEventEntry mqtt.DataEvent
 }
+
+/*func DIMqtt() (out MqttDependency) {
+	container := DI()
+	if err := container.Provide(MqttDependency); err != nil {
+		panic(err)
+	}
+	if err := container.Invoke(func(c *mqtt2.MqttServer) { out = c }); err != nil {
+		panic(err)
+	}
+	return
+}
+
+type MqttDependency struct {
+	dig.In
+
+	//DataEventEntry mqtt.DataEvent
+	MqttClient mqtt2.MqttServer
+}
+*/

+ 1 - 1
module/consumer/consumer.go → module/asynq/consumer.go

@@ -1,4 +1,4 @@
-package consumer
+package asynq
 
 import (
 	"context"

+ 2 - 4
module/consumer/interface.go → module/asynq/interface.go

@@ -1,4 +1,4 @@
-package consumer
+package asynq
 
 import (
 	"context"
@@ -12,9 +12,7 @@ import (
 	"go.uber.org/dig"
 )
 
-var Module = di.Options(
-	di.Provide(NewJob),
-)
+var Module = di.Options(di.Provide(NewJob))
 
 func NewJob(entry Entry) BizExec {
 	return &entry

+ 28 - 0
module/mqtt/interface.go

@@ -0,0 +1,28 @@
+package mqtt
+
+import (
+	"context"
+	"kpt-pasture/config"
+	"kpt-pasture/service/mqtt2"
+	"kpt-pasture/store/kptstore"
+
+	"gitee.com/xuyiping_admin/pkg/di"
+	"go.uber.org/dig"
+)
+
+var Module = di.Options(di.Provide(NewMqtt))
+
+func NewMqtt(entry Entry) Exec {
+	return &entry
+}
+
+type Entry struct {
+	dig.In
+	Cfg        *config.AppConfig
+	DB         *kptstore.DB
+	MqttClient mqtt2.MqttServer
+}
+
+type Exec interface {
+	Subscribe(ctx context.Context, msg []byte) error
+}

+ 10 - 0
module/mqtt/mqtt.go

@@ -0,0 +1,10 @@
+package mqtt
+
+import (
+	"context"
+)
+
+func (s *Entry) Subscribe(ctx context.Context, msg []byte) error {
+
+	return nil
+}

+ 5 - 18
service/mqtt2/sub.go → service/mqtt2/consumer.go

@@ -1,38 +1,25 @@
 package mqtt2
 
 import (
-	"kpt-pasture/config"
 	"sync"
 
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 	"go.uber.org/zap"
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
-
-	"go.uber.org/dig"
 )
 
-type MQTTClient struct {
-	dig.In
-
-	MQTTServer *MqttServer
-}
-
-func NewMqttService(consumer *MqttServer) *MQTTClient {
-	return &MQTTClient{MQTTServer: consumer}
-}
-
 var bufferPool = sync.Pool{
 	New: func() interface{} {
 		return make([]byte, 1024) // 根据实际情况调整缓冲区大小
 	},
 }
 
-// StartConsuming 处理收到的消息
-func (s *MQTTClient) StartConsuming(conf *config.MqttSetting) <-chan []byte {
-	var subMsgChan = make(chan []byte, 2*conf.WorkNumber)
+// Consumer 处理收到的消息
+func (s *MqttClient) Consumer(topic string, qos, workNumber int32) <-chan []byte {
+	var subMsgChan = make(chan []byte, 2*workNumber)
 	defer close(subMsgChan)
-	if token := s.MQTTServer.Client.Subscribe(conf.Topic, byte(conf.Qos), func(client mqtt.Client, msg mqtt.Message) {
+	if token := s.Client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
 		buffer := bufferPool.Get().([]byte)
 		copy(buffer, msg.Payload())
 		select {
@@ -41,7 +28,7 @@ func (s *MQTTClient) StartConsuming(conf *config.MqttSetting) <-chan []byte {
 		}
 	}); token.Wait() && token.Error() != nil {
 		close(subMsgChan)
-		zaplog.Error("SubMsg", zap.Any("configOption", conf), zap.Any("err", token.Error()))
+		zaplog.Error("Consumer", zap.Any("topic", topic), zap.Any("err", token.Error()))
 		return subMsgChan
 	}
 

+ 22 - 5
service/mqtt2/mqtt.go → service/mqtt2/interface.go

@@ -3,6 +3,9 @@ package mqtt2
 import (
 	"fmt"
 	"kpt-pasture/config"
+	"sync"
+
+	"go.uber.org/dig"
 
 	"gitee.com/xuyiping_admin/pkg/di"
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
@@ -12,12 +15,19 @@ import (
 
 var Module = di.Options(di.Provide(NewServer))
 
-type MqttServer struct {
+type MqttClient struct {
+	dig.In
 	Client golangMqtt.Client
-	Conf   *config.MqttSetting
+	mx     sync.Mutex
+}
+
+type MqttServer interface {
+	Consumer(top string, qos, workNumber int32) <-chan []byte
+	Producer(top string, qos int32, data []byte) error
+	Close()
 }
 
-func NewServer(conf config.MqttSetting) *MqttServer {
+func NewServer(conf config.MqttSetting) *MqttClient {
 	opts := golangMqtt.NewClientOptions()
 	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
 	opts.SetClientID(conf.ClientId)
@@ -32,8 +42,7 @@ func NewServer(conf config.MqttSetting) *MqttServer {
 	if token := client.Connect(); token.Wait() && token.Error() != nil {
 		panic(token.Error())
 	}
-
-	return &MqttServer{Client: client}
+	return &MqttClient{Client: client}
 }
 
 var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
@@ -47,3 +56,11 @@ var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client)
 var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
 	zaplog.Info("connectLost", zap.Any("err", err.Error()))
 }
+
+func (s *MqttClient) Close() {
+	s.mx.Lock()
+	defer s.mx.Unlock()
+	if s.Client.IsConnected() {
+		s.Client.Disconnect(250)
+	}
+}

+ 14 - 0
service/mqtt2/producer.go

@@ -0,0 +1,14 @@
+package mqtt2
+
+import (
+	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
+	"go.uber.org/zap"
+)
+
+func (s *MqttClient) Producer(top string, qos int32, data []byte) error {
+	if token := s.Client.Publish(top, byte(qos), false, data); token.Wait() && token.Error() != nil {
+		zaplog.Error("producer", zap.String("topic", top), zap.String("data", string(data)), zap.Any("err", token.Error()))
+		return token.Error()
+	}
+	return nil
+}