package mqtt2 import ( "fmt" "kpt-pasture/config" "sync" "go.uber.org/dig" "gitee.com/xuyiping_admin/pkg/di" "gitee.com/xuyiping_admin/pkg/logger/zaplog" golangMqtt "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" ) var Module = di.Options(di.Provide(NewServer)) type MqttClient struct { dig.In Client golangMqtt.Client mx sync.Mutex } type MqttServer interface { Consumer(top string, qos, workNumber int32) <-chan []byte Producer(top string, qos int32, data []byte) error Close() } func NewServer(conf config.MqttSetting) *MqttClient { opts := golangMqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port)) opts.SetClientID(conf.ClientId) opts.SetCleanSession(false) opts.SetUsername(conf.UserName) opts.SetPassword(conf.Password) opts.SetAutoReconnect(conf.AutoReconnect) opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := golangMqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } return &MqttClient{Client: client} } var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) { zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic())) } var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) { zaplog.Info("connectedClient", zap.Any("client", client)) } var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) { zaplog.Info("connectLost", zap.Any("err", err.Error())) } func (s *MqttClient) Close() { s.mx.Lock() defer s.mx.Unlock() if s.Client.IsConnected() { s.Client.Disconnect(250) } }