12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- package mqtt2
- import (
- "fmt"
- "kpt-pasture/config"
- "gitee.com/xuyiping_admin/pkg/di"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- golangMqtt "github.com/eclipse/paho.mqtt.golang"
- "go.uber.org/zap"
- )
- var Module = di.Options(
- di.Provide(NewServer),
- )
- type Server interface {
- Subscribe(topic string, qos int32) []byte
- Publish(topic string, qos int32, retained bool, payload string) error
- }
- type MqttServer struct {
- client golangMqtt.Client
- Conf config.MqttSetting
- }
- func NewServer(conf config.MqttSetting) *MqttServer {
- opts := golangMqtt.NewClientOptions()
- opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
- opts.SetClientID(conf.ClientId)
- opts.SetCleanSession(false)
- opts.SetUsername(conf.UserName)
- opts.SetPassword(conf.Password)
- opts.SetAutoReconnect(conf.AutoReconnect)
- opts.SetDefaultPublishHandler(messagePubHandler)
- opts.OnConnect = connectHandler
- opts.OnConnectionLost = connectLostHandler
- client := golangMqtt.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- return &MqttServer{client: client, Conf: conf}
- }
- var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
- zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
- }
- var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
- zaplog.Info("connectedClient", zap.Any("client", client))
- }
- var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
- zaplog.Info("connectLost", zap.Any("err", err.Error()))
- }
- func (s *MqttServer) Subscribe(topic string, qos int32, handler golangMqtt.MessageHandler) {
- if token := s.client.Subscribe(topic, byte(qos), handler); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- }
- func (s *MqttServer) Publish(topic string, qos int32, retained bool, payload string) {
- token := s.client.Publish(topic, byte(qos), retained, payload)
- token.Wait()
- }
|