| 123456789101112131415161718192021222324252627282930313233343536373839404142 | 
							- package mqtt
 
- import (
 
- 	"fmt"
 
- 	"sync"
 
- 	"sync/atomic"
 
- 	"time"
 
- )
 
- var (
 
- 	bufferPool = sync.Pool{
 
- 		New: func() interface{} {
 
- 			return make([]byte, 1024*50) // 根据实际情况调整缓冲区大小
 
- 		},
 
- 	}
 
- 	readMsgChan  = make(chan []byte, 5)
 
- 	writeMsgChan = make(chan []byte, 5)
 
- )
 
- // Consumer 处理收到的消息
 
- func (s *IMqttClient) Consumer(handle func([]byte)) {
 
- 	tc := time.NewTicker(2 * time.Minute)
 
- 	defer tc.Stop()
 
- 	var allCnt int32 = 0
 
- 	go func() {
 
- 		for range tc.C {
 
- 			fmt.Println("allCnt:", allCnt)
 
- 			atomic.StoreInt32(&allCnt, 0)
 
- 		}
 
- 	}()
 
- 	for {
 
- 		select {
 
- 		case msg := <-writeMsgChan:
 
- 			handle(msg)
 
- 		case rsg := <-readMsgChan:
 
- 			writeMsgChan <- rsg
 
- 			atomic.AddInt32(&allCnt, 1)
 
- 		}
 
- 	}
 
- }
 
 
  |