interface.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package mqtt2
  2. import (
  3. "fmt"
  4. "kpt-pasture/config"
  5. "sync"
  6. "go.uber.org/dig"
  7. "gitee.com/xuyiping_admin/pkg/di"
  8. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  9. golangMqtt "github.com/eclipse/paho.mqtt.golang"
  10. "go.uber.org/zap"
  11. )
  12. var Module = di.Options(di.Provide(NewServer))
  13. type MqttClient struct {
  14. dig.In
  15. Client golangMqtt.Client
  16. mx sync.Mutex
  17. }
  18. type MqttServer interface {
  19. Consumer(top string, qos, workNumber int32) <-chan []byte
  20. Producer(top string, qos int32, data []byte) error
  21. Close()
  22. }
  23. func NewServer(conf config.MqttSetting) *MqttClient {
  24. opts := golangMqtt.NewClientOptions()
  25. opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
  26. opts.SetClientID(conf.ClientId)
  27. opts.SetCleanSession(false)
  28. opts.SetUsername(conf.UserName)
  29. opts.SetPassword(conf.Password)
  30. opts.SetAutoReconnect(conf.AutoReconnect)
  31. opts.SetDefaultPublishHandler(messagePubHandler)
  32. opts.OnConnect = connectHandler
  33. opts.OnConnectionLost = connectLostHandler
  34. client := golangMqtt.NewClient(opts)
  35. if token := client.Connect(); token.Wait() && token.Error() != nil {
  36. panic(token.Error())
  37. }
  38. return &MqttClient{Client: client}
  39. }
  40. var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
  41. zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
  42. }
  43. var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
  44. zaplog.Info("connectedClient", zap.Any("client", client))
  45. }
  46. var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
  47. zaplog.Info("connectLost", zap.Any("err", err.Error()))
  48. }
  49. func (s *MqttClient) Close() {
  50. s.mx.Lock()
  51. defer s.mx.Unlock()
  52. if s.Client.IsConnected() {
  53. s.Client.Disconnect(250)
  54. }
  55. }