123456789101112131415161718192021222324252627282930313233343536373839404142 |
- package mqtt2
- import (
- "sync"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- "go.uber.org/zap"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- )
- var bufferPool = sync.Pool{
- New: func() interface{} {
- return make([]byte, 1024) // 根据实际情况调整缓冲区大小
- },
- }
- // Consumer 处理收到的消息
- func (s *MqttClient) Consumer(topic string, qos, workNumber int32) <-chan []byte {
- var subMsgChan = make(chan []byte, 2*workNumber)
- defer close(subMsgChan)
- if token := s.Client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
- buffer := bufferPool.Get().([]byte)
- copy(buffer, msg.Payload())
- select {
- case subMsgChan <- buffer[:len(msg.Payload())]:
- bufferPool.Put(buffer)
- }
- }); token.Wait() && token.Error() != nil {
- close(subMsgChan)
- zaplog.Error("Consumer", zap.Any("topic", topic), zap.Any("err", token.Error()))
- return subMsgChan
- }
- go func() {
- for msg := range subMsgChan {
- bufferPool.Put(msg)
- }
- }()
- return subMsgChan
- }
|