| 123456789101112131415161718192021222324252627282930313233343536373839404142 | package mqttimport (	"fmt"	"sync"	"sync/atomic"	"time")var (	bufferPool = sync.Pool{		New: func() interface{} {			return make([]byte, 1024*50) // 根据实际情况调整缓冲区大小		},	}	readMsgChan  = make(chan []byte, 5)	writeMsgChan = make(chan []byte, 5))// Consumer 处理收到的消息func (s *IMqttClient) Consumer(handle func([]byte)) {	tc := time.NewTicker(2 * time.Minute)	defer tc.Stop()	var allCnt int32 = 0	go func() {		for range tc.C {			fmt.Println("allCnt:", allCnt)			atomic.StoreInt32(&allCnt, 0)		}	}()	for {		select {		case msg := <-writeMsgChan:			handle(msg)		case rsg := <-readMsgChan:			writeMsgChan <- rsg			atomic.AddInt32(&allCnt, 1)		}	}}
 |