12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- package mqtt2
- import (
- "fmt"
- "kpt-pasture/config"
- "kpt-pasture/util"
- "time"
- "gitee.com/xuyiping_admin/pkg/di"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- golangMqtt "github.com/eclipse/paho.mqtt.golang"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "go.uber.org/zap"
- )
- var Module = di.Options(di.Provide(NewServer))
- type MqttClient struct {
- golangMqtt.Client
- Config config.MqttSetting
- }
- type MqttServer interface {
- Consumer(func([]byte))
- Producer(top string, qos int32, data []byte) error
- Close()
- }
- var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
- zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
- }
- var connectHandler = func(client golangMqtt.Client, mqttClient *MqttClient) {
- fmt.Println("====connectHandler=======")
- buffer := bufferPool.Get().([]byte)
- if token := client.Subscribe(mqttClient.Config.SubTopic, byte(mqttClient.Config.Qos), func(client mqtt.Client, msg mqtt.Message) {
- copy(buffer, msg.Payload())
- select {
- case readMsgChan <- buffer[:len(msg.Payload())]:
- fmt.Println("====buffer=======", string(buffer))
- bufferPool.Put(buffer)
- }
- }); token.Wait() && token.Error() != nil {
- zaplog.Error("Consumer", zap.Any("topic", mqttClient.Config.SubTopic), zap.Any("err", token.Error()))
- }
- }
- var connectLostHandler = func(client golangMqtt.Client, err error, mqttClient *MqttClient) {
- zaplog.Info("connectLost", zap.Any("err", err.Error()))
- for {
- token := client.Connect()
- if token.Wait() && token.Error() == nil {
- // 成功重连,更新全局客户端实例
- connectHandler(client, mqttClient)
- return
- }
- zaplog.Error("ConnectionRetry", zap.Any("err", token.Error()))
- time.Sleep(5 * time.Second)
- }
- }
- func NewServer(config *config.AppConfig) MqttServer {
- conf := config.Mqtt
- opts := golangMqtt.NewClientOptions()
- opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
- opts.SetClientID(util.GenerateRandomNumberString(16))
- opts.SetUsername(conf.UserName)
- opts.SetPassword(conf.Password)
- opts.SetCleanSession(false)
- opts.SetConnectRetry(true)
- opts.SetConnectRetryInterval(5 * time.Minute)
- opts.SetKeepAlive(time.Second * time.Duration(conf.KeepAlive))
- opts.SetAutoReconnect(conf.AutoReconnect)
- opts.SetDefaultPublishHandler(messagePubHandler)
- client := golangMqtt.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- mqttClient := &MqttClient{Client: client, Config: conf}
- opts.OnConnect = func(client mqtt.Client) {
- connectHandler(client, mqttClient)
- }
- opts.OnConnectionLost = func(client mqtt.Client, err error) {
- connectLostHandler(client, err, mqttClient)
- }
- return mqttClient
- }
- func (s *MqttClient) Close() {
- if s.Client.IsConnected() {
- s.Client.Disconnect(250)
- }
- }
|