123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- package mqtt
- import (
- "fmt"
- "kpt-pasture/config"
- "sync"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- golangMqtt "github.com/eclipse/paho.mqtt.golang"
- "go.uber.org/zap"
- )
- var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
- zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
- }
- var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
- zaplog.Info("connectedClient", zap.Any("client", client))
- }
- var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
- zaplog.Info("connectLost", zap.Any("err", err.Error()))
- }
- func (d *DataEventEntry) NewMqtt(conf config.MqttSetting) golangMqtt.Client {
- opts := golangMqtt.NewClientOptions()
- opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
- opts.SetClientID(conf.ClientId)
- opts.SetCleanSession(false)
- opts.SetUsername(conf.UserName)
- opts.SetPassword(conf.Password)
- opts.SetAutoReconnect(conf.AutoReconnect)
- opts.SetDefaultPublishHandler(messagePubHandler)
- opts.OnConnect = connectHandler
- opts.OnConnectionLost = connectLostHandler
- client := golangMqtt.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- return client
- }
- var bufferPool = sync.Pool{
- New: func() interface{} {
- return make([]byte, 1024)
- },
- }
- func (d *DataEventEntry) SubMsg(conf config.MqttSetting, client golangMqtt.Client) {
- var subMsgChan = make(chan []byte, 2*conf.WorkNumber)
- if token := client.Subscribe(conf.Topic, byte(conf.Qos), func(client golangMqtt.Client, msg golangMqtt.Message) {
- buffer := bufferPool.Get().([]byte)
- copy(buffer, msg.Payload())
- subMsgChan <- buffer[:len(msg.Payload())]
- }); token.Wait() && token.Error() != nil {
- close(subMsgChan)
- zaplog.Error("SubMsg", zap.Any("configOption", conf), zap.Any("err", token.Error()))
- return
- }
- defer close(subMsgChan)
- select {
- case msg := <-subMsgChan:
- bufferPool.Put(msg)
- d.ProcessMessages(msg)
- }
- }
- func (d *DataEventEntry) ProcessMessages(msg []byte) {
-
- }
|