Selaa lähdekoodia

mqtt: mqtt服务抽离优化

Yi 3 kuukautta sitten
vanhempi
commit
03a65515ca

+ 1 - 4
cmd/mqtt.go

@@ -1,7 +1,6 @@
 package cmd
 
 import (
-	"fmt"
 	"kpt-pasture/dep"
 
 	"github.com/spf13/cobra"
@@ -11,8 +10,6 @@ var MqttCmd = &cobra.Command{
 	Use:   "mqtt",
 	Short: "mqtt server",
 	Run: func(cmd *cobra.Command, args []string) {
-		dep.MqttRun().Consumer(func(bytes []byte) {
-			fmt.Println("====bytes====", string(bytes))
-		})
+		dep.DIMqttService().MqttServer.Run()
 	},
 }

+ 0 - 2
dep/dep.go

@@ -8,7 +8,6 @@ import (
 	moduleMqtt "kpt-pasture/module/mqtt"
 	"kpt-pasture/service/asynqsvc"
 	"kpt-pasture/service/mqtt"
-	"kpt-pasture/service/mqtt2"
 	"kpt-pasture/service/redis"
 	"kpt-pasture/service/sso"
 	"kpt-pasture/service/wechat"
@@ -40,7 +39,6 @@ func Options() []di.HubOption {
 		redis.Module,
 		crontab.Module,
 		mqtt.Module,
-		mqtt2.Module,
 		moduleMqtt.Module,
 	}
 }

+ 7 - 7
dep/di_asynq.go

@@ -21,6 +21,13 @@ func DIAsynqWorkOrder() (out *asynqsvc.Server) {
 	return
 }
 
+// AsyncDependency is the dependency for worker and kafka
+type AsyncDependency struct {
+	dig.In
+
+	WorkOrder asynq.BizExec // BizExec 工单
+}
+
 // AsynqWorkOrder 相关消费
 func AsynqWorkOrder(dep AsyncDependency) *asynqsvc.Server {
 	cfg := config.Options()
@@ -30,10 +37,3 @@ func AsynqWorkOrder(dep AsyncDependency) *asynqsvc.Server {
 	srv.Mux.HandleFunc(pattern, dep.WorkOrder.DayWorkOrder) // 工单
 	return srv
 }
-
-// AsyncDependency is the dependency for worker and kafka
-type AsyncDependency struct {
-	dig.In
-
-	WorkOrder asynq.BizExec // BizExec 工单
-}

+ 6 - 11
dep/di_mqtt.go

@@ -1,10 +1,12 @@
 package dep
 
 import (
-	"kpt-pasture/service/mqtt2"
+	"kpt-pasture/service/mqtt"
+
+	"go.uber.org/dig"
 )
 
-/*func DIMqtt() (out MqttDependency) {
+func DIMqttService() (out MqttDependency) {
 	container := DI()
 	if err := container.Invoke(func(c MqttDependency) { out = c }); err != nil {
 		panic(err)
@@ -12,16 +14,9 @@ import (
 	return
 }
 
+// MqttDependency 依赖注入结构体
 type MqttDependency struct {
 	dig.In
 
-	DataEventEntry mqtt.DataEvent
-}*/
-
-func MqttRun() (out mqtt2.MqttServer) {
-	container := DI()
-	if err := container.Invoke(func(c mqtt2.MqttServer) { out = c }); err != nil {
-		panic(err)
-	}
-	return
+	MqttServer mqtt.IMqttServer
 }

+ 27 - 57
module/mqtt/mqtt.go → module/mqtt/handle.go

@@ -1,16 +1,11 @@
 package mqtt
 
 import (
-	"context"
 	"encoding/json"
 	"kpt-pasture/model"
 	"kpt-pasture/util"
-	"os"
-	"os/signal"
 	"strconv"
 	"sync"
-	"syscall"
-	"time"
 
 	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 
@@ -42,7 +37,12 @@ var FrameId = map[int32]int32{
 	111: 111, 112: 112, 113: 113, 114: 114, 115: 115, 116: 116, 118: 118,
 }
 
-func (e *Entry) Subscribe(ctx context.Context, msg <-chan []byte) {
+var (
+	batchSize = 10
+	batchList = make([]*model.NeckRingOriginal, 0, batchSize)
+)
+
+func (e *Entry) NeckRingHandle(data []byte) {
 	DSMLog := &DataInsertNeckRingLog{
 		NeckRingOriginalData:   make([]*model.NeckRingOriginal, 0),
 		NeckRingErrorData:      make([]*model.NeckRingOriginal, 0),
@@ -50,64 +50,34 @@ func (e *Entry) Subscribe(ctx context.Context, msg <-chan []byte) {
 		Mx:                     &sync.RWMutex{},
 	}
 
-	batchSize := 20
-	batchList := make([]*model.NeckRingOriginal, 0, batchSize)
-
-	sc := make(chan os.Signal, 1)
-	signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-
-	// 设置5分钟超时
-	tc := time.After(5 * time.Minute)
-
-	for {
-		select {
-		case data := <-msg:
-			newData, err := e.MsgDataFormat(data)
-			if err != nil {
-				continue
-			}
-			batchList = append(batchList, newData...)
-			if len(batchList) >= batchSize {
-				DSMLog.Mx.Lock()
-				for _, batch := range batchList {
-					// 异常脖环数据
-					if _, ok := FrameId[batch.FrameId]; !ok {
-						DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, batch)
-						continue
-					}
-
-					// 未佩戴的脖环数据
-					if ok := e.NeckRingIsBind(batch.Imei); !ok {
-						DSMLog.NeckRingUnRegisterData = append(DSMLog.NeckRingUnRegisterData, batch)
-						continue
-					}
-					// 正常脖环数据
-					DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, batch)
+	newData, _ := e.MsgDataFormat(data)
+	if newData != nil && len(newData) > 0 {
+		batchList = append(batchList, newData...)
+		if len(batchList) >= batchSize {
+			DSMLog.Mx.Lock()
+			for _, batch := range batchList {
+				// 异常脖环数据
+				if _, ok := FrameId[batch.FrameId]; !ok {
+					DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, batch)
+					continue
 				}
-
-				if err = e.CreatedData(DSMLog); err != nil {
-					zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
+				// 未佩戴的脖环数据
+				if ok := e.NeckRingIsBind(batch.Imei); !ok {
+					DSMLog.NeckRingUnRegisterData = append(DSMLog.NeckRingUnRegisterData, batch)
+					continue
 				}
-
-				DSMLog.Mx.Unlock()
-				DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
-				DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0)
-				DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0)
-				batchList = batchList[:0]
-			}
-			// 优雅退出
-		case <-sc:
-			if err := e.CreatedData(DSMLog); err != nil {
-				zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
+				// 正常脖环数据
+				DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, batch)
 			}
-		case <-tc:
+
 			if err := e.CreatedData(DSMLog); err != nil {
 				zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
 			}
-			zaplog.Info("subMsgChan-os", zap.Any("success", DSMLog.NeckRingOriginalData))
+			DSMLog.Mx.Unlock()
+			DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
 			DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0)
 			DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0)
-			DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
+			batchList = batchList[:0]
 		}
 	}
 }
@@ -144,7 +114,7 @@ func (e *Entry) MsgDataFormat(msg []byte) ([]*model.NeckRingOriginal, error) {
 		zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
 	}
 
-	batchList := make([]*model.NeckRingOriginal, 0)
+	batchList = make([]*model.NeckRingOriginal, 0)
 	if len(neckLog.NeckRing.NeckPck) > 0 {
 		for _, v := range neckLog.NeckRing.NeckPck {
 			// 存储到数据库

+ 5 - 8
module/mqtt/interface.go

@@ -1,9 +1,7 @@
 package mqtt
 
 import (
-	"context"
 	"kpt-pasture/config"
-	"kpt-pasture/service/mqtt2"
 	"kpt-pasture/store/kptstore"
 
 	"gitee.com/xuyiping_admin/pkg/di"
@@ -12,17 +10,16 @@ import (
 
 var Module = di.Options(di.Provide(NewMqtt))
 
-func NewMqtt(entry Entry) Exec {
+func NewMqtt(entry Entry) DataHandle {
 	return &entry
 }
 
 type Entry struct {
 	dig.In
-	Cfg        *config.AppConfig
-	DB         *kptstore.DB
-	MqttClient mqtt2.MqttServer
+	Cfg *config.AppConfig
+	DB  *kptstore.DB
 }
 
-type Exec interface {
-	Subscribe(ctx context.Context, msg <-chan []byte)
+type DataHandle interface {
+	NeckRingHandle(msg []byte)
 }

+ 3 - 1
module/mqtt/sql.go

@@ -1,13 +1,15 @@
 package mqtt
 
 import (
+	"kpt-pasture/model"
+
 	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
 )
 
 // NeckRingIsBind 脖环是否绑定
 func (e *Entry) NeckRingIsBind(number string) bool {
 	var count int64 = 0
-	if err := e.DB.Where("number = ?", number).
+	if err := e.DB.Model(new(model.NeckRing)).Where("number = ?", number).
 		Where("status != ?", pasturePb.NeckRingStatus_Unbind).
 		Count(&count).Error; err != nil {
 		return false

+ 42 - 0
service/mqtt/consumer.go

@@ -0,0 +1,42 @@
+package mqtt
+
+import (
+	"fmt"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+var (
+	bufferPool = sync.Pool{
+		New: func() interface{} {
+			return make([]byte, 1024) // 根据实际情况调整缓冲区大小
+		},
+	}
+	readMsgChan  = make(chan []byte, 2)
+	writeMsgChan = make(chan []byte, 2)
+)
+
+// Consumer 处理收到的消息
+func (s *IMqttClient) Consumer(handle func([]byte)) {
+	ac := time.NewTicker(2 * time.Minute)
+	var allCnt int32 = 0
+	go func() {
+		for {
+			select {
+			case <-ac.C:
+				fmt.Println("allCnt:", allCnt)
+				atomic.StoreInt32(&allCnt, 0)
+			}
+		}
+	}()
+	for {
+		select {
+		case msg := <-writeMsgChan:
+			handle(msg)
+		case rsg := <-readMsgChan:
+			writeMsgChan <- rsg
+			atomic.AddInt32(&allCnt, 1)
+		}
+	}
+}

+ 103 - 11
service/mqtt/interface.go

@@ -1,27 +1,119 @@
 package mqtt
 
 import (
+	"fmt"
 	"kpt-pasture/config"
-	"kpt-pasture/store/kptstore"
+	mqtt2 "kpt-pasture/module/mqtt"
+	"kpt-pasture/util"
+	"os"
+	"os/signal"
+	"syscall"
+	"time"
+
+	"go.uber.org/dig"
 
 	"gitee.com/xuyiping_admin/pkg/di"
+	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 	golangMqtt "github.com/eclipse/paho.mqtt.golang"
-	"go.uber.org/dig"
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"go.uber.org/zap"
 )
 
-var Module = di.Options(di.Provide(NewDataEvent))
+var Module = di.Options(di.Provide(NewServer))
 
-type DataEvent interface {
-	NewMqtt(configOption config.MqttSetting) golangMqtt.Client
-	SubMsg(configOption config.MqttSetting, client golangMqtt.Client)
+type IMqttClient struct {
+	dig.In
+	golangMqtt.Client
+	Config  config.MqttSetting
+	MqttHub mqtt2.Entry // 处理数据
 }
 
-type DataEventEntry struct {
-	dig.In
+type IMqttServer interface {
+	Consumer(func([]byte))
+	Producer(top string, qos int32, data []byte) error
+	Run()
+	Close()
+}
+
+var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
+	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
+}
 
-	DB *kptstore.DB // DB
+var connectHandler = func(client golangMqtt.Client) {
+	zaplog.Info("connectHandler", zap.Any("client", client))
 }
 
-func NewDataEvent(entry DataEventEntry) DataEvent {
-	return &entry
+var connectLostHandler = func(client golangMqtt.Client, err error) {
+	zaplog.Error("connectLost-err", zap.Any("err", err.Error()))
+	for {
+		token := client.Connect()
+		if token.Wait() && token.Error() == nil {
+			// 成功重连,更新全局客户端实例
+			connectHandler(client)
+			break
+		}
+		zaplog.Error("ConnectionRetry", zap.Any("err", token.Error()))
+		time.Sleep(5 * time.Second)
+	}
+	zaplog.Error("connectLost-success")
+}
+
+func NewServer(config *config.AppConfig) IMqttServer {
+	conf := config.Mqtt
+	opts := golangMqtt.NewClientOptions()
+	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
+	opts.SetClientID(util.GenerateRandomNumberString(16))
+	opts.SetUsername(conf.UserName)
+	opts.SetPassword(conf.Password)
+	opts.SetCleanSession(false)
+	opts.SetConnectRetry(true)
+	opts.SetConnectRetryInterval(5 * time.Minute)
+	opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive))
+	opts.SetAutoReconnect(conf.AutoReconnect)
+	opts.SetDefaultPublishHandler(messagePubHandler)
+	opts.OnConnect = connectHandler
+	opts.OnConnectionLost = connectLostHandler
+	client := golangMqtt.NewClient(opts)
+	if token := client.Connect(); token.Wait() && token.Error() != nil {
+		panic(token.Error())
+	}
+	return &IMqttClient{Client: client, Config: conf}
+}
+
+func (s *IMqttClient) Close() {
+	if s.Client.IsConnected() {
+		s.Client.Disconnect(250)
+	}
+}
+
+func (s *IMqttClient) Run() {
+	// 设置信号监听以优雅关闭服务器
+	stop := make(chan os.Signal, 1)
+	signal.Notify(stop, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+
+	go func() {
+		for {
+			select {
+			// 等待停止信号
+			case <-stop:
+				s.Close()
+				return
+			}
+		}
+	}()
+
+	// 订阅主题
+	buffer := bufferPool.Get().([]byte)
+	if token := s.Client.Subscribe(s.Config.SubTopic, byte(s.Config.Qos), func(client mqtt.Client, msg mqtt.Message) {
+		copy(buffer, msg.Payload())
+		select {
+		case readMsgChan <- buffer[:len(msg.Payload())]:
+			bufferPool.Put(buffer)
+		}
+	}); token.Wait() && token.Error() != nil {
+		zaplog.Error("Consumer", zap.Any("topic", s.Config.SubTopic), zap.Any("err", token.Error()))
+	}
+
+	// 启动数据处理
+	s.Consumer(s.MqttHub.NeckRingHandle)
 }

+ 2 - 2
service/mqtt2/producer.go → service/mqtt/producer.go

@@ -1,11 +1,11 @@
-package mqtt2
+package mqtt
 
 import (
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 	"go.uber.org/zap"
 )
 
-func (s *MqttClient) Producer(top string, qos int32, data []byte) error {
+func (s *IMqttClient) Producer(top string, qos int32, data []byte) error {
 	if token := s.Client.Publish(top, byte(qos), false, data); token.Wait() && token.Error() != nil {
 		zaplog.Error("producer", zap.String("topic", top), zap.String("data", string(data)), zap.Any("err", token.Error()))
 		return token.Error()

+ 0 - 1
service/mqtt/pub.go

@@ -1 +0,0 @@
-package mqtt

+ 0 - 255
service/mqtt/sub.go

@@ -1,255 +0,0 @@
-package mqtt
-
-import (
-	"fmt"
-	"kpt-pasture/config"
-	"kpt-pasture/util"
-	"sync"
-
-	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
-	golangMqtt "github.com/eclipse/paho.mqtt.golang"
-	"go.uber.org/zap"
-)
-
-var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
-	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
-}
-
-var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
-	zaplog.Info("connectedClient", zap.Any("client", client))
-}
-
-var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
-	zaplog.Info("connectLost", zap.Any("err", err.Error()))
-}
-
-func (d *DataEventEntry) NewMqtt(conf config.MqttSetting) golangMqtt.Client {
-	opts := golangMqtt.NewClientOptions()
-	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
-	opts.SetClientID(util.GenerateRandomNumberString(16))
-	opts.SetCleanSession(false)
-	opts.SetUsername(conf.UserName)
-	opts.SetPassword(conf.Password)
-	opts.SetAutoReconnect(conf.AutoReconnect)
-	opts.SetDefaultPublishHandler(messagePubHandler)
-	opts.OnConnect = connectHandler
-	opts.OnConnectionLost = connectLostHandler
-	client := golangMqtt.NewClient(opts)
-	if token := client.Connect(); token.Wait() && token.Error() != nil {
-		panic(token.Error())
-	}
-	return client
-}
-
-var bufferPool = sync.Pool{
-	New: func() interface{} {
-		return make([]byte, 1024) // 根据实际情况调整缓冲区大小
-	},
-}
-
-func (d *DataEventEntry) SubMsg(conf config.MqttSetting, client golangMqtt.Client) {
-	var subMsgChan = make(chan []byte, 2*conf.WorkNumber)
-	if token := client.Subscribe(conf.SubTopic, byte(conf.Qos), func(client golangMqtt.Client, msg golangMqtt.Message) {
-		buffer := bufferPool.Get().([]byte)
-		copy(buffer, msg.Payload())
-		subMsgChan <- buffer[:len(msg.Payload())]
-	}); token.Wait() && token.Error() != nil {
-		close(subMsgChan)
-		zaplog.Error("SubMsg", zap.Any("configOption", conf), zap.Any("err", token.Error()))
-		return
-	}
-
-	defer close(subMsgChan)
-	select {
-	case msg := <-subMsgChan:
-		bufferPool.Put(msg)
-		d.ProcessMessages(msg)
-	}
-}
-
-func (d *DataEventEntry) ProcessMessages(msg []byte) {
-	/*neckRingOriginalData, err := d.MsgDataFormat(msg)
-	if err != nil {
-		zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
-		return
-	}
-	if neckRingOriginalData == nil {
-		return
-	}
-
-	if neckRingOriginalData.Imei == "" {
-		zaplog.Info("neckRingOriginalData", zap.Any("msg", string(msg)), zap.Any("neckRingOriginalData", neckRingOriginalData))
-		return
-	}
-
-	defer func() {
-		if time.Now().Day()%15 == 0 {
-			d.DB.Model(new(model.NeckRingOriginalData)).
-				Where("created_at < ?", time.Now().AddDate(-2, 0, 0).Unix()).
-				Delete(new(model.NeckRingOriginalData))
-			return
-		}
-	}()
-
-	// 计算牛只实际活动时间
-	nowDayTime := time.Now()
-	currHour := nowDayTime.Hour() + 2
-	frameIdHour := neckRingOriginalData.FrameId * 2
-	frameDayTime := fmt.Sprintf("%s %s:00:00", nowDayTime.Format(model.LayoutDate2), fmt.Sprintf("%02d", frameIdHour))
-	if frameIdHour > int32(currHour) {
-		frameDayTime = fmt.Sprintf("%s %s:00:00", nowDayTime.AddDate(0, 0, -1).Format(model.LayoutDate2), fmt.Sprintf("%02d", frameIdHour))
-	}
-
-	neckRingOriginalData.ActiveTime = frameDayTime
-	if err = d.DB.Create(neckRingOriginalData).Error; err != nil {
-		zaplog.Error("ProcessMessages", zap.Any("err", err), zap.Any("neckRingOriginalData", neckRingOriginalData))
-	}
-
-	// 更新脖环数据状态
-	neckRingStatus := pasturePb.NeckRingStatus_Normal
-	errorReason := ""
-	if neckRingOriginalData.FrameId >= 11 || neckRingOriginalData.FrameId < 0 {
-		neckRingStatus = pasturePb.NeckRingStatus_Error
-		errorReason = "数据异常"
-	}
-
-	d.DB.Model(new(model.NeckRingLog)).
-		Where("number = ?", neckRingOriginalData.Imei).
-		Updates(map[string]interface{}{
-			"status":       neckRingStatus,
-			"error_reason": errorReason,
-		})*/
-}
-
-/*func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.NeckRingOriginalData, error) {
-	msgData := make(map[string]interface{})
-	pairs := strings.Split(util.MsgFormat(string(msg)), " ")
-	for _, pair := range pairs {
-		parts := strings.SplitN(pair, ":", 2)
-		if len(parts) != 2 {
-			continue
-		}
-		key, value := parts[0], parts[1]
-		if len(key) == 0 {
-			continue
-		}
-		msgData[key] = value
-	}
-
-	softVer := int64(0)
-	if softVerInter, ok := msgData["SOFT_VER"]; ok {
-		if softVerstr, ok := softVerInter.(string); ok {
-			softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
-		}
-	}
-
-	uuid := ""
-	if uuidInter, ok := msgData["uuid"]; ok {
-		if uuidStr, ok := uuidInter.(string); ok {
-			uuid = uuidStr
-		}
-	}
-
-	frameId := int64(0)
-	if frameIdInter, ok := msgData["frameid"]; ok {
-		if frameId64, ok := frameIdInter.(string); ok {
-			frameId, _ = strconv.ParseInt(frameId64, 10, 64)
-		}
-	}
-	cowId := ""
-	if cowIdInter, ok := msgData["cowid"]; ok {
-		if cowIdStr, ok := cowIdInter.(string); ok {
-			cowId = cowIdStr
-		}
-	}
-
-	csq := int64(0)
-	if csqInter, ok := msgData["csq"]; ok {
-		if csq32, ok := csqInter.(string); ok {
-			csq, _ = strconv.ParseInt(csq32, 10, 64)
-		}
-	}
-
-	temp := float64(0)
-	if tempInter, ok := msgData["Temp"]; ok {
-		if tempFloat, ok := tempInter.(string); ok {
-			temp, _ = strconv.ParseFloat(tempFloat, 64)
-		}
-	}
-
-	imei := ""
-	if imeiInter, ok := msgData["imei"]; ok {
-		if imeiStr, ok := imeiInter.(string); ok {
-			imei = imeiStr
-		}
-	}
-
-	active := int64(0)
-	if activeInter, ok := msgData["active"]; ok {
-		if active32, ok := activeInter.(string); ok {
-			active, _ = strconv.ParseInt(active32, 10, 64)
-		}
-	}
-
-	inAction := int64(0)
-	if inActionInter, ok := msgData["inactive"]; ok {
-		if inAction32, ok := inActionInter.(string); ok {
-			inAction, _ = strconv.ParseInt(inAction32, 10, 64)
-		}
-	}
-
-	ruMina := int64(0)
-	if ruMinaInter, ok := msgData["Rumina"]; ok {
-		if ruMina32, ok := ruMinaInter.(string); ok {
-			ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
-		}
-	}
-
-	intake := int64(0)
-	if intakeInter, ok := msgData["Intake"]; ok {
-		if intake32, ok := intakeInter.(string); ok {
-			intake, _ = strconv.ParseInt(intake32, 10, 64)
-		}
-	}
-
-	gasp := int64(0)
-	if gaspInter, ok := msgData["gasp"]; ok {
-		if gasp32, ok := gaspInter.(string); ok {
-			gasp, _ = strconv.ParseInt(gasp32, 10, 64)
-		}
-	}
-
-	other := int64(0)
-	if otherInter, ok := msgData["other"]; ok {
-		if other32, ok := otherInter.(string); ok {
-			other, _ = strconv.ParseInt(other32, 10, 64)
-		}
-	}
-
-	reMain := int64(0)
-	if reMainInter, ok := msgData["Remain"]; ok {
-		if reMain32, ok := reMainInter.(string); ok {
-			reMain, _ = strconv.ParseInt(reMain32, 10, 64)
-		}
-	}
-
-	return &model.NeckRingOriginalData{
-		SoftVer:       softVer,
-		Uuid:          uuid,
-		OriginFrameId: int32(frameId),
-		FrameId:       int32(frameId),
-		CowId:         cowId,
-		Csq:           csq,
-		Temp:          int64(temp * 100),
-		Imei:          imei,
-		Active:        int32(active),
-		InActive:      int32(inAction),
-		RuMina:        int32(ruMina),
-		Intake:        int32(intake),
-		Gasp:          int32(gasp),
-		Other:         int32(other),
-		ReMain:        int32(reMain),
-		IsShow:        pasturePb.IsShow_No,
-	}, nil
-}
-*/

+ 0 - 51
service/mqtt2/consumer.go

@@ -1,51 +0,0 @@
-package mqtt2
-
-import (
-	"os"
-	"os/signal"
-	"sync"
-	"syscall"
-	"time"
-)
-
-var (
-	bufferPool = sync.Pool{
-		New: func() interface{} {
-			return make([]byte, 1024) // 根据实际情况调整缓冲区大小
-		},
-	}
-	readMsgChan  = make(chan []byte, 2)
-	writeMsgChan = make(chan []byte, 2)
-)
-
-// Consumer 处理收到的消息
-func (s *MqttClient) Consumer(fun func([]byte)) {
-	go func() {
-		for {
-			select {
-			case msg := <-writeMsgChan:
-				fun(msg)
-			}
-		}
-	}()
-
-	sc := make(chan os.Signal, 1)
-	signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-
-	// 设置2分钟超时
-	tc := time.After(2 * time.Minute)
-	for {
-		select {
-		case rsg := <-readMsgChan:
-			writeMsgChan <- rsg
-		case <-sc:
-			close(writeMsgChan)
-			close(readMsgChan)
-			s.Close()
-		case <-tc:
-			close(writeMsgChan)
-			close(readMsgChan)
-			s.Close()
-		}
-	}
-}

+ 0 - 94
service/mqtt2/interface.go

@@ -1,94 +0,0 @@
-package mqtt2
-
-import (
-	"fmt"
-	"kpt-pasture/config"
-	"kpt-pasture/util"
-	"time"
-
-	"gitee.com/xuyiping_admin/pkg/di"
-	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
-	golangMqtt "github.com/eclipse/paho.mqtt.golang"
-	mqtt "github.com/eclipse/paho.mqtt.golang"
-	"go.uber.org/zap"
-)
-
-var Module = di.Options(di.Provide(NewServer))
-
-type MqttClient struct {
-	golangMqtt.Client
-	Config config.MqttSetting
-}
-
-type MqttServer interface {
-	Consumer(func([]byte))
-	Producer(top string, qos int32, data []byte) error
-	Close()
-}
-
-var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
-	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
-}
-
-var connectHandler = func(client golangMqtt.Client, mqttClient *MqttClient) {
-	fmt.Println("====connectHandler=======")
-	buffer := bufferPool.Get().([]byte)
-	if token := client.Subscribe(mqttClient.Config.SubTopic, byte(mqttClient.Config.Qos), func(client mqtt.Client, msg mqtt.Message) {
-		copy(buffer, msg.Payload())
-		select {
-		case readMsgChan <- buffer[:len(msg.Payload())]:
-			fmt.Println("====buffer=======", string(buffer))
-			bufferPool.Put(buffer)
-		}
-	}); token.Wait() && token.Error() != nil {
-		zaplog.Error("Consumer", zap.Any("topic", mqttClient.Config.SubTopic), zap.Any("err", token.Error()))
-	}
-}
-
-var connectLostHandler = func(client golangMqtt.Client, err error, mqttClient *MqttClient) {
-	zaplog.Info("connectLost", zap.Any("err", err.Error()))
-	for {
-		token := client.Connect()
-		if token.Wait() && token.Error() == nil {
-			// 成功重连,更新全局客户端实例
-			connectHandler(client, mqttClient)
-			return
-		}
-		zaplog.Error("ConnectionRetry", zap.Any("err", token.Error()))
-		time.Sleep(5 * time.Second)
-	}
-}
-
-func NewServer(config *config.AppConfig) MqttServer {
-	conf := config.Mqtt
-	opts := golangMqtt.NewClientOptions()
-	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
-	opts.SetClientID(util.GenerateRandomNumberString(16))
-	opts.SetUsername(conf.UserName)
-	opts.SetPassword(conf.Password)
-	opts.SetCleanSession(false)
-	opts.SetConnectRetry(true)
-	opts.SetConnectRetryInterval(5 * time.Minute)
-	opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive))
-	opts.SetAutoReconnect(conf.AutoReconnect)
-	opts.SetDefaultPublishHandler(messagePubHandler)
-	client := golangMqtt.NewClient(opts)
-	if token := client.Connect(); token.Wait() && token.Error() != nil {
-		panic(token.Error())
-	}
-	mqttClient := &MqttClient{Client: client, Config: conf}
-	opts.OnConnect = func(client mqtt.Client) {
-		connectHandler(client, mqttClient)
-	}
-
-	opts.OnConnectionLost = func(client mqtt.Client, err error) {
-		connectLostHandler(client, err, mqttClient)
-	}
-	return mqttClient
-}
-
-func (s *MqttClient) Close() {
-	if s.Client.IsConnected() {
-		s.Client.Disconnect(250)
-	}
-}