123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- package mqtt2
- import (
- "fmt"
- "kpt-pasture/config"
- "sync"
- "go.uber.org/dig"
- "gitee.com/xuyiping_admin/pkg/di"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- golangMqtt "github.com/eclipse/paho.mqtt.golang"
- "go.uber.org/zap"
- )
- var Module = di.Options(di.Provide(NewServer))
- type MqttClient struct {
- dig.In
- Client golangMqtt.Client
- mx sync.Mutex
- }
- type MqttServer interface {
- Consumer(top string, qos, workNumber int32) <-chan []byte
- Producer(top string, qos int32, data []byte) error
- Close()
- }
- func NewServer(conf config.MqttSetting) *MqttClient {
- opts := golangMqtt.NewClientOptions()
- opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
- opts.SetClientID(conf.ClientId)
- opts.SetCleanSession(false)
- opts.SetUsername(conf.UserName)
- opts.SetPassword(conf.Password)
- opts.SetAutoReconnect(conf.AutoReconnect)
- opts.SetDefaultPublishHandler(messagePubHandler)
- opts.OnConnect = connectHandler
- opts.OnConnectionLost = connectLostHandler
- client := golangMqtt.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- return &MqttClient{Client: client}
- }
- var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
- zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
- }
- var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
- zaplog.Info("connectedClient", zap.Any("client", client))
- }
- var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
- zaplog.Info("connectLost", zap.Any("err", err.Error()))
- }
- func (s *MqttClient) Close() {
- s.mx.Lock()
- defer s.mx.Unlock()
- if s.Client.IsConnected() {
- s.Client.Disconnect(250)
- }
- }
|