package mqtt import ( "fmt" "sync" "sync/atomic" "time" ) var ( bufferPool = sync.Pool{ New: func() interface{} { return make([]byte, 1024*50) // 根据实际情况调整缓冲区大小 }, } readMsgChan = make(chan []byte, 5) writeMsgChan = make(chan []byte, 5) ) // Consumer 处理收到的消息 func (s *IMqttClient) Consumer(handle func([]byte)) { tc := time.NewTicker(2 * time.Minute) defer tc.Stop() var allCnt int32 = 0 go func() { for range tc.C { fmt.Println("allCnt:", allCnt) atomic.StoreInt32(&allCnt, 0) } }() for { select { case msg := <-writeMsgChan: handle(msg) case rsg := <-readMsgChan: writeMsgChan <- rsg atomic.AddInt32(&allCnt, 1) } } }