consumer.go 995 B

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. package mqtt2
  2. import (
  3. "sync"
  4. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  5. "go.uber.org/zap"
  6. mqtt "github.com/eclipse/paho.mqtt.golang"
  7. )
  8. var bufferPool = sync.Pool{
  9. New: func() interface{} {
  10. return make([]byte, 1024) // 根据实际情况调整缓冲区大小
  11. },
  12. }
  13. // Consumer 处理收到的消息
  14. func (s *MqttClient) Consumer(topic string, qos, workNumber int32) <-chan []byte {
  15. var subMsgChan = make(chan []byte, 2*workNumber)
  16. defer close(subMsgChan)
  17. if token := s.Client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
  18. buffer := bufferPool.Get().([]byte)
  19. copy(buffer, msg.Payload())
  20. select {
  21. case subMsgChan <- buffer[:len(msg.Payload())]:
  22. bufferPool.Put(buffer)
  23. }
  24. }); token.Wait() && token.Error() != nil {
  25. close(subMsgChan)
  26. zaplog.Error("Consumer", zap.Any("topic", topic), zap.Any("err", token.Error()))
  27. return subMsgChan
  28. }
  29. go func() {
  30. for msg := range subMsgChan {
  31. bufferPool.Put(msg)
  32. }
  33. }()
  34. return subMsgChan
  35. }