|
@@ -0,0 +1,67 @@
|
|
|
+package mqtt2
|
|
|
+
|
|
|
+import (
|
|
|
+ "fmt"
|
|
|
+ "kpt-pasture/config"
|
|
|
+
|
|
|
+ "gitee.com/xuyiping_admin/pkg/di"
|
|
|
+ "gitee.com/xuyiping_admin/pkg/logger/zaplog"
|
|
|
+ golangMqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
+ "go.uber.org/zap"
|
|
|
+)
|
|
|
+
|
|
|
+var Module = di.Options(
|
|
|
+ di.Provide(NewServer),
|
|
|
+)
|
|
|
+
|
|
|
+type Server interface {
|
|
|
+ Subscribe(topic string, qos int32) []byte
|
|
|
+ Publish(topic string, qos int32, retained bool, payload string) error
|
|
|
+}
|
|
|
+
|
|
|
+type MqttServer struct {
|
|
|
+ client golangMqtt.Client
|
|
|
+ Conf config.MqttSetting
|
|
|
+}
|
|
|
+
|
|
|
+func NewServer(conf config.MqttSetting) *MqttServer {
|
|
|
+ opts := golangMqtt.NewClientOptions()
|
|
|
+ opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
|
|
|
+ opts.SetClientID(conf.ClientId)
|
|
|
+ opts.SetCleanSession(false)
|
|
|
+ opts.SetUsername(conf.UserName)
|
|
|
+ opts.SetPassword(conf.Password)
|
|
|
+ opts.SetAutoReconnect(conf.AutoReconnect)
|
|
|
+ opts.SetDefaultPublishHandler(messagePubHandler)
|
|
|
+ opts.OnConnect = connectHandler
|
|
|
+ opts.OnConnectionLost = connectLostHandler
|
|
|
+ client := golangMqtt.NewClient(opts)
|
|
|
+ if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
|
+ panic(token.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ return &MqttServer{client: client, Conf: conf}
|
|
|
+}
|
|
|
+
|
|
|
+var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
|
|
|
+ zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
|
|
|
+}
|
|
|
+
|
|
|
+var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
|
|
|
+ zaplog.Info("connectedClient", zap.Any("client", client))
|
|
|
+}
|
|
|
+
|
|
|
+var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
|
|
|
+ zaplog.Info("connectLost", zap.Any("err", err.Error()))
|
|
|
+}
|
|
|
+
|
|
|
+func (s *MqttServer) Subscribe(topic string, qos int32, handler golangMqtt.MessageHandler) {
|
|
|
+ if token := s.client.Subscribe(topic, byte(qos), handler); token.Wait() && token.Error() != nil {
|
|
|
+ panic(token.Error())
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (s *MqttServer) Publish(topic string, qos int32, retained bool, payload string) {
|
|
|
+ token := s.client.Publish(topic, byte(qos), retained, payload)
|
|
|
+ token.Wait()
|
|
|
+}
|