package mqtt2 import ( "fmt" "kpt-pasture/config" "gitee.com/xuyiping_admin/pkg/di" "gitee.com/xuyiping_admin/pkg/logger/zaplog" golangMqtt "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" ) var Module = di.Options(di.Provide(NewServer)) type MqttServer struct { Client golangMqtt.Client Conf *config.MqttSetting } func NewServer(conf config.MqttSetting) *MqttServer { opts := golangMqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port)) opts.SetClientID(conf.ClientId) opts.SetCleanSession(false) opts.SetUsername(conf.UserName) opts.SetPassword(conf.Password) opts.SetAutoReconnect(conf.AutoReconnect) opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := golangMqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } return &MqttServer{Client: client} } var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) { zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic())) } var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) { zaplog.Info("connectedClient", zap.Any("client", client)) } var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) { zaplog.Info("connectLost", zap.Any("err", err.Error())) }