package mqtt2 import ( "sync" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "go.uber.org/zap" mqtt "github.com/eclipse/paho.mqtt.golang" ) var bufferPool = sync.Pool{ New: func() interface{} { return make([]byte, 1024) // 根据实际情况调整缓冲区大小 }, } // Consumer 处理收到的消息 func (s *MqttClient) Consumer(topic string, qos, workNumber int32) <-chan []byte { var subMsgChan = make(chan []byte, 2*workNumber) defer close(subMsgChan) if token := s.Client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) { buffer := bufferPool.Get().([]byte) copy(buffer, msg.Payload()) select { case subMsgChan <- buffer[:len(msg.Payload())]: bufferPool.Put(buffer) } }); token.Wait() && token.Error() != nil { close(subMsgChan) zaplog.Error("Consumer", zap.Any("topic", topic), zap.Any("err", token.Error())) return subMsgChan } go func() { for msg := range subMsgChan { bufferPool.Put(msg) } }() return subMsgChan }