mqtt.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package mqtt2
  2. import (
  3. "fmt"
  4. "kpt-pasture/config"
  5. "gitee.com/xuyiping_admin/pkg/di"
  6. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  7. golangMqtt "github.com/eclipse/paho.mqtt.golang"
  8. "go.uber.org/zap"
  9. )
  10. var Module = di.Options(di.Provide(NewServer))
  11. type MqttServer struct {
  12. Client golangMqtt.Client
  13. Conf *config.MqttSetting
  14. }
  15. func NewServer(conf config.MqttSetting) *MqttServer {
  16. opts := golangMqtt.NewClientOptions()
  17. opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
  18. opts.SetClientID(conf.ClientId)
  19. opts.SetCleanSession(false)
  20. opts.SetUsername(conf.UserName)
  21. opts.SetPassword(conf.Password)
  22. opts.SetAutoReconnect(conf.AutoReconnect)
  23. opts.SetDefaultPublishHandler(messagePubHandler)
  24. opts.OnConnect = connectHandler
  25. opts.OnConnectionLost = connectLostHandler
  26. client := golangMqtt.NewClient(opts)
  27. if token := client.Connect(); token.Wait() && token.Error() != nil {
  28. panic(token.Error())
  29. }
  30. return &MqttServer{Client: client}
  31. }
  32. var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
  33. zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
  34. }
  35. var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
  36. zaplog.Info("connectedClient", zap.Any("client", client))
  37. }
  38. var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
  39. zaplog.Info("connectLost", zap.Any("err", err.Error()))
  40. }