Browse Source

mqtt: 优化

Yi 4 months ago
parent
commit
dc049fcb91

+ 4 - 5
cmd/mqtt.go

@@ -1,7 +1,7 @@
 package cmd
 
 import (
-	"kpt-pasture/config"
+	"fmt"
 	"kpt-pasture/dep"
 
 	"github.com/spf13/cobra"
@@ -11,9 +11,8 @@ var MqttCmd = &cobra.Command{
 	Use:   "mqtt",
 	Short: "mqtt server",
 	Run: func(cmd *cobra.Command, args []string) {
-		conf := config.Options().Mqtt
-		dataEvent := dep.DIMqtt()
-		mqttClient := dataEvent.DataEventEntry.NewMqtt(conf)
-		dataEvent.DataEventEntry.SubMsg(conf, mqttClient)
+		dep.MqttRun().Consumer(func(bytes []byte) {
+			fmt.Println("====bytes====", string(bytes))
+		})
 	},
 }

+ 5 - 6
config/app.develop.yaml

@@ -45,12 +45,11 @@ cron:
   cow_pregnant: "0 00 15 * * ?"
   neck_ring: "*/30 * * * * ?"
 mqtt:
-  broker: "47.92.95.119"
-  port: 1883
-  username: ""
-  password: ""
-  client_id: "ping"
-  topic: "a"
+  broker: "kptyun.com"
+  port: 1983
+  username: "kptmqtt"
+  password: "kepaiteng"
+  sub_topic: "kptmqtt"
   qos: 0
   retain: false
   keep_alive: 60

+ 1 - 2
config/app.go

@@ -162,8 +162,7 @@ type MqttSetting struct {
 	Port              int    `json:"port" yaml:"port"`
 	UserName          string `json:"username" yaml:"username"`
 	Password          string `json:"password" yaml:"password"`
-	ClientId          string `json:"client" yaml:"client_id"`
-	Topic             string `json:"topic"  yaml:"topic"`
+	SubTopic          string `json:"sub_topic"  yaml:"sub_topic"`
 	Retain            bool   `json:"retain" yaml:"retain"`
 	Qos               int    `json:"qos" yaml:"qos"`
 	KeepAlive         int    `json:"keepAlive" yaml:"keep_alive"`

+ 5 - 6
config/app.test.yaml

@@ -44,12 +44,11 @@ cron:
   cow_pregnant: "0 00 15 * * ?"
   neck_ring: "0 30 * * * ?"
 mqtt:
-  broker: "47.92.95.119"
-  port: 1883
-  username: ""
-  password: ""
-  client_id: "ping"
-  topic: "a"
+  broker: "kptyun.com"
+  port: 1983
+  username: "kptmqtt"
+  password: "kepaiteng"
+  sub_topic: "kptmqtt"
   qos: 0
   retain: false
   keep_alive: 60

+ 2 - 1
dep/dep.go

@@ -8,6 +8,7 @@ import (
 	moduleMqtt "kpt-pasture/module/mqtt"
 	"kpt-pasture/service/asynqsvc"
 	"kpt-pasture/service/mqtt"
+	"kpt-pasture/service/mqtt2"
 	"kpt-pasture/service/redis"
 	"kpt-pasture/service/sso"
 	"kpt-pasture/service/wechat"
@@ -39,7 +40,7 @@ func Options() []di.HubOption {
 		redis.Module,
 		crontab.Module,
 		mqtt.Module,
-		//mqtt2.Module,
+		mqtt2.Module,
 		moduleMqtt.Module,
 	}
 }

+ 5 - 18
dep/di_mqtt.go

@@ -1,12 +1,10 @@
 package dep
 
 import (
-	"kpt-pasture/service/mqtt"
-
-	"go.uber.org/dig"
+	"kpt-pasture/service/mqtt2"
 )
 
-func DIMqtt() (out MqttDependency) {
+/*func DIMqtt() (out MqttDependency) {
 	container := DI()
 	if err := container.Invoke(func(c MqttDependency) { out = c }); err != nil {
 		panic(err)
@@ -18,23 +16,12 @@ type MqttDependency struct {
 	dig.In
 
 	DataEventEntry mqtt.DataEvent
-}
+}*/
 
-/*func DIMqtt() (out MqttDependency) {
+func MqttRun() (out mqtt2.MqttServer) {
 	container := DI()
-	if err := container.Provide(MqttDependency); err != nil {
-		panic(err)
-	}
-	if err := container.Invoke(func(c *mqtt2.MqttServer) { out = c }); err != nil {
+	if err := container.Invoke(func(c mqtt2.MqttServer) { out = c }); err != nil {
 		panic(err)
 	}
 	return
 }
-
-type MqttDependency struct {
-	dig.In
-
-	//DataEventEntry mqtt.DataEvent
-	MqttClient mqtt2.MqttServer
-}
-*/

+ 1 - 1
model/outbound.go

@@ -31,7 +31,7 @@ func (o *Outbound) TableName() string {
 
 func NewOutbound(req *pasturePb.OutboundApplyItem, currentUser *SystemUser) *Outbound {
 	return &Outbound{
-		Number:           fmt.Sprintf("%s%s", util.GenerateRandomNumberString(16), time.Now().Format(LayoutDate)),
+		Number:           fmt.Sprintf("%s%s", util.GenerateRandomNumberString(8), time.Now().Format(LayoutDate)),
 		OutType:          req.OutType,
 		AuditStatus:      pasturePb.AuditStatus_Pending,
 		ApplicantId:      int32(currentUser.Id),

+ 1 - 1
module/backend/calendar.go

@@ -193,7 +193,7 @@ func (s *StoreEntry) ImmunisationCowList(ctx context.Context, req *pasturePb.Ite
 				"cowId":                "牛号",
 				"penName":              "栏舍",
 				"dayAge":               "日龄",
-				"planStartTime":        "免疫开始时间",
+				"planDay":              "免疫开始时间",
 				"immunizationPlanName": "免疫名称",
 				"status":               "状态",
 			},

+ 4 - 0
module/backend/prescription.go

@@ -386,6 +386,10 @@ func (s *StoreEntry) CreatedOrUpdateImmunization(ctx context.Context, req *pastu
 		systemUser   *model.SystemUser
 	)
 
+	if req.Conditions == pasturePb.ImmunizationConditions_Other_Vaccine_After && req.ImmunizationPlanId <= 0 {
+		return xerr.Custom("请选择具体的免疫计划")
+	}
+
 	if req.ImmunizationPlanId > 0 {
 		otherImmunization, err := s.GetImmunizationById(ctx, int64(req.ImmunizationPlanId))
 		if err != nil {

+ 1 - 4
module/crontab/cow_cron.go

@@ -114,7 +114,6 @@ func (e *Entry) ImmunizationPlan() error {
 		pref := e.DB.Table(fmt.Sprintf("%s as a", new(model.Cow).TableName())).
 			Select("a.*").
 			Where("a.admission_status = ?", pasturePb.AdmissionStatus_Admission)
-
 		if plan.CowType > 0 {
 			pref.Where("a.cow_type = ?", plan.CowType)
 		}
@@ -129,6 +128,7 @@ func (e *Entry) ImmunizationPlan() error {
 				Where("a.is_pregnant = ?", pasturePb.IsShow_Ok)
 		case pasturePb.ImmunizationConditions_Month:
 			// todo 待实现月份
+			continue
 		case pasturePb.ImmunizationConditions_Admission_Days:
 			pref.Where("a.admission_age = ?", plan.Value)
 		case pasturePb.ImmunizationConditions_Other_Vaccine_After:
@@ -151,16 +151,13 @@ func (e *Entry) ImmunizationPlan() error {
 		newImmunizationPlanCowList := model.NewCowImmunizationPlanList(cowList, penList, plan)
 		newEventItemList := model.NewEventItemList(cowList, pasturePb.CalendarType_Immunisation)
 		if err := e.DB.Transaction(func(tx *gorm.DB) error {
-
 			if err := tx.Create(newImmunizationPlanCowList).Error; err != nil {
 				return xerr.WithStack(err)
 
 			}
-
 			if err := tx.Create(newEventItemList).Error; err != nil {
 				return xerr.WithStack(err)
 			}
-
 			return nil
 		}); err != nil {
 			zaplog.Error("ImmunizationPlan", zap.Any("newImmunizationPlanCowList", err), zap.Any("plan", plan), zap.Any("cowList", cowList))

+ 3 - 2
service/mqtt/sub.go

@@ -3,6 +3,7 @@ package mqtt
 import (
 	"fmt"
 	"kpt-pasture/config"
+	"kpt-pasture/util"
 	"sync"
 
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
@@ -25,7 +26,7 @@ var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt
 func (d *DataEventEntry) NewMqtt(conf config.MqttSetting) golangMqtt.Client {
 	opts := golangMqtt.NewClientOptions()
 	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
-	opts.SetClientID(conf.ClientId)
+	opts.SetClientID(util.GenerateRandomNumberString(16))
 	opts.SetCleanSession(false)
 	opts.SetUsername(conf.UserName)
 	opts.SetPassword(conf.Password)
@@ -48,7 +49,7 @@ var bufferPool = sync.Pool{
 
 func (d *DataEventEntry) SubMsg(conf config.MqttSetting, client golangMqtt.Client) {
 	var subMsgChan = make(chan []byte, 2*conf.WorkNumber)
-	if token := client.Subscribe(conf.Topic, byte(conf.Qos), func(client golangMqtt.Client, msg golangMqtt.Message) {
+	if token := client.Subscribe(conf.SubTopic, byte(conf.Qos), func(client golangMqtt.Client, msg golangMqtt.Message) {
 		buffer := bufferPool.Get().([]byte)
 		copy(buffer, msg.Payload())
 		subMsgChan <- buffer[:len(msg.Payload())]

+ 38 - 29
service/mqtt2/consumer.go

@@ -1,42 +1,51 @@
 package mqtt2
 
 import (
+	"os"
+	"os/signal"
 	"sync"
-
-	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
-	"go.uber.org/zap"
-
-	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"syscall"
+	"time"
 )
 
-var bufferPool = sync.Pool{
-	New: func() interface{} {
-		return make([]byte, 1024) // 根据实际情况调整缓冲区大小
-	},
-}
-
-// Consumer 处理收到的消息
-func (s *MqttClient) Consumer(topic string, qos, workNumber int32) <-chan []byte {
-	var subMsgChan = make(chan []byte, 2*workNumber)
-	defer close(subMsgChan)
-	if token := s.Client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
-		buffer := bufferPool.Get().([]byte)
-		copy(buffer, msg.Payload())
-		select {
-		case subMsgChan <- buffer[:len(msg.Payload())]:
-			bufferPool.Put(buffer)
-		}
-	}); token.Wait() && token.Error() != nil {
-		close(subMsgChan)
-		zaplog.Error("Consumer", zap.Any("topic", topic), zap.Any("err", token.Error()))
-		return subMsgChan
+var (
+	bufferPool = sync.Pool{
+		New: func() interface{} {
+			return make([]byte, 1024) // 根据实际情况调整缓冲区大小
+		},
 	}
+	readMsgChan  = make(chan []byte, 2)
+	writeMsgChan = make(chan []byte, 2)
+)
 
+// Consumer 处理收到的消息
+func (s *MqttClient) Consumer(fun func([]byte)) {
 	go func() {
-		for msg := range subMsgChan {
-			bufferPool.Put(msg)
+		for {
+			select {
+			case msg := <-writeMsgChan:
+				fun(msg)
+			}
 		}
 	}()
 
-	return subMsgChan
+	sc := make(chan os.Signal, 1)
+	signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+
+	// 设置2分钟超时
+	tc := time.After(2 * time.Minute)
+	for {
+		select {
+		case rsg := <-readMsgChan:
+			writeMsgChan <- rsg
+		case <-sc:
+			close(writeMsgChan)
+			close(readMsgChan)
+			s.Close()
+		case <-tc:
+			close(writeMsgChan)
+			close(readMsgChan)
+			s.Close()
+		}
+	}
 }

+ 54 - 26
service/mqtt2/interface.go

@@ -3,63 +3,91 @@ package mqtt2
 import (
 	"fmt"
 	"kpt-pasture/config"
-	"sync"
-
-	"go.uber.org/dig"
+	"kpt-pasture/util"
+	"time"
 
 	"gitee.com/xuyiping_admin/pkg/di"
 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 	golangMqtt "github.com/eclipse/paho.mqtt.golang"
+	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"go.uber.org/zap"
 )
 
 var Module = di.Options(di.Provide(NewServer))
 
 type MqttClient struct {
-	dig.In
-	Client golangMqtt.Client
-	mx     sync.Mutex
+	golangMqtt.Client
+	Config config.MqttSetting
 }
 
 type MqttServer interface {
-	Consumer(top string, qos, workNumber int32) <-chan []byte
+	Consumer(func([]byte))
 	Producer(top string, qos int32, data []byte) error
 	Close()
 }
 
-func NewServer(conf config.MqttSetting) *MqttClient {
+var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
+	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
+}
+
+var connectHandler = func(client golangMqtt.Client, mqttClient *MqttClient) {
+	fmt.Println("====connectHandler=======")
+	buffer := bufferPool.Get().([]byte)
+	if token := client.Subscribe(mqttClient.Config.SubTopic, byte(mqttClient.Config.Qos), func(client mqtt.Client, msg mqtt.Message) {
+		copy(buffer, msg.Payload())
+		select {
+		case readMsgChan <- buffer[:len(msg.Payload())]:
+			fmt.Println("====buffer=======", string(buffer))
+			bufferPool.Put(buffer)
+		}
+	}); token.Wait() && token.Error() != nil {
+		zaplog.Error("Consumer", zap.Any("topic", mqttClient.Config.SubTopic), zap.Any("err", token.Error()))
+	}
+}
+
+var connectLostHandler = func(client golangMqtt.Client, err error, mqttClient *MqttClient) {
+	zaplog.Info("connectLost", zap.Any("err", err.Error()))
+	for {
+		token := client.Connect()
+		if token.Wait() && token.Error() == nil {
+			// 成功重连,更新全局客户端实例
+			connectHandler(client, mqttClient)
+			return
+		}
+		zaplog.Error("ConnectionRetry", zap.Any("err", token.Error()))
+		time.Sleep(5 * time.Second)
+	}
+}
+
+func NewServer(config *config.AppConfig) MqttServer {
+	conf := config.Mqtt
 	opts := golangMqtt.NewClientOptions()
 	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
-	opts.SetClientID(conf.ClientId)
-	opts.SetCleanSession(false)
+	opts.SetClientID(util.GenerateRandomNumberString(16))
 	opts.SetUsername(conf.UserName)
 	opts.SetPassword(conf.Password)
+	opts.SetCleanSession(false)
+	opts.SetConnectRetry(true)
+	opts.SetConnectRetryInterval(5 * time.Minute)
+	opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive))
 	opts.SetAutoReconnect(conf.AutoReconnect)
 	opts.SetDefaultPublishHandler(messagePubHandler)
-	opts.OnConnect = connectHandler
-	opts.OnConnectionLost = connectLostHandler
 	client := golangMqtt.NewClient(opts)
 	if token := client.Connect(); token.Wait() && token.Error() != nil {
 		panic(token.Error())
 	}
-	return &MqttClient{Client: client}
-}
-
-var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
-	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
-}
-
-var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
-	zaplog.Info("connectedClient", zap.Any("client", client))
-}
+	mqttClient := &MqttClient{Client: client, Config: conf}
+	opts.OnConnect = func(client mqtt.Client) {
+		connectHandler(client, mqttClient)
+	}
 
-var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
-	zaplog.Info("connectLost", zap.Any("err", err.Error()))
+	opts.OnConnectionLost = func(client mqtt.Client, err error) {
+		connectLostHandler(client, err, mqttClient)
+	}
+	return mqttClient
 }
 
 func (s *MqttClient) Close() {
-	s.mx.Lock()
-	defer s.mx.Unlock()
 	if s.Client.IsConnected() {
 		s.Client.Disconnect(250)
 	}

+ 21 - 9
util/util.go

@@ -11,18 +11,30 @@ import (
 )
 
 const (
-	LayoutTime  = "2006-01-02 15:04:05"
-	Layout      = "2006-01-02"
-	LayoutMonth = "2006-01"
+	LayoutTime    = "2006-01-02 15:04:05"
+	Layout        = "2006-01-02"
+	LayoutMonth   = "2006-01"
+	LetterBytes   = "abcdefghijkmnpqrstuvwxyzABCDEFGHIJKLMNPQRSTUVWXYZ23456789"
+	LetterIdxBits = 6                    // 6 bits to represent a letter index
+	LetterIdxMask = 1<<LetterIdxBits - 1 // All 1-bits, as many as letterIdxBits
+	LetterIdxMax  = 63 / LetterIdxBits   // # of letter indices fitting in 63 bits
 )
 
 // GenerateRandomNumberString 生成指定长度的数字串
-func GenerateRandomNumberString(length int) string {
-	const charset = "0123456789"
-	seededRand := rand.New(rand.NewSource(time.Now().UnixNano()))
-	result := make([]byte, length)
-	for i := range result {
-		result[i] = charset[seededRand.Intn(len(charset))]
+func GenerateRandomNumberString(n int) string {
+	result := make([]byte, n)
+	// A rand.Int63() generates 63 random bits, enough for letterIdxMax characters!
+	rand.Seed(time.Now().UnixNano())
+	for i, cache, remain := n-1, rand.Int63(), LetterIdxMax; i >= 0; {
+		if remain == 0 {
+			cache, remain = rand.Int63(), LetterIdxMax
+		}
+		if idx := int(cache & LetterIdxMask); idx < len(LetterBytes) {
+			result[i] = LetterBytes[idx]
+			i--
+		}
+		cache >>= LetterIdxBits
+		remain--
 	}
 	return string(result)
 }