mqtt.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package mqtt2
  2. import (
  3. "fmt"
  4. "kpt-pasture/config"
  5. "gitee.com/xuyiping_admin/pkg/di"
  6. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  7. golangMqtt "github.com/eclipse/paho.mqtt.golang"
  8. "go.uber.org/zap"
  9. )
  10. var Module = di.Options(
  11. di.Provide(NewServer),
  12. )
  13. type Server interface {
  14. Subscribe(topic string, qos int32) []byte
  15. Publish(topic string, qos int32, retained bool, payload string) error
  16. }
  17. type MqttServer struct {
  18. client golangMqtt.Client
  19. Conf config.MqttSetting
  20. }
  21. func NewServer(conf config.MqttSetting) *MqttServer {
  22. opts := golangMqtt.NewClientOptions()
  23. opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
  24. opts.SetClientID(conf.ClientId)
  25. opts.SetCleanSession(false)
  26. opts.SetUsername(conf.UserName)
  27. opts.SetPassword(conf.Password)
  28. opts.SetAutoReconnect(conf.AutoReconnect)
  29. opts.SetDefaultPublishHandler(messagePubHandler)
  30. opts.OnConnect = connectHandler
  31. opts.OnConnectionLost = connectLostHandler
  32. client := golangMqtt.NewClient(opts)
  33. if token := client.Connect(); token.Wait() && token.Error() != nil {
  34. panic(token.Error())
  35. }
  36. return &MqttServer{client: client, Conf: conf}
  37. }
  38. var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
  39. zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
  40. }
  41. var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
  42. zaplog.Info("connectedClient", zap.Any("client", client))
  43. }
  44. var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
  45. zaplog.Info("connectLost", zap.Any("err", err.Error()))
  46. }
  47. func (s *MqttServer) Subscribe(topic string, qos int32, handler golangMqtt.MessageHandler) {
  48. if token := s.client.Subscribe(topic, byte(qos), handler); token.Wait() && token.Error() != nil {
  49. panic(token.Error())
  50. }
  51. }
  52. func (s *MqttServer) Publish(topic string, qos int32, retained bool, payload string) {
  53. token := s.client.Publish(topic, byte(qos), retained, payload)
  54. token.Wait()
  55. }