12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- package mqtt2
- import (
- "fmt"
- "kpt-pasture/config"
- "gitee.com/xuyiping_admin/pkg/di"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- golangMqtt "github.com/eclipse/paho.mqtt.golang"
- "go.uber.org/zap"
- )
- var Module = di.Options(di.Provide(NewServer))
- type MqttServer struct {
- Client golangMqtt.Client
- Conf *config.MqttSetting
- }
- func NewServer(conf config.MqttSetting) *MqttServer {
- opts := golangMqtt.NewClientOptions()
- opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
- opts.SetClientID(conf.ClientId)
- opts.SetCleanSession(false)
- opts.SetUsername(conf.UserName)
- opts.SetPassword(conf.Password)
- opts.SetAutoReconnect(conf.AutoReconnect)
- opts.SetDefaultPublishHandler(messagePubHandler)
- opts.OnConnect = connectHandler
- opts.OnConnectionLost = connectLostHandler
- client := golangMqtt.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- return &MqttServer{Client: client}
- }
- var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
- zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
- }
- var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
- zaplog.Info("connectedClient", zap.Any("client", client))
- }
- var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
- zaplog.Info("connectLost", zap.Any("err", err.Error()))
- }
|