123456789101112131415161718192021222324252627282930313233343536373839404142 |
- package mqtt
- import (
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- )
- var (
- bufferPool = sync.Pool{
- New: func() interface{} {
- return make([]byte, 1024*50) // 根据实际情况调整缓冲区大小
- },
- }
- readMsgChan = make(chan []byte, 5)
- writeMsgChan = make(chan []byte, 5)
- )
- // Consumer 处理收到的消息
- func (s *IMqttClient) Consumer(handle func([]byte)) {
- tc := time.NewTicker(2 * time.Minute)
- defer tc.Stop()
- var allCnt int32 = 0
- go func() {
- for range tc.C {
- fmt.Println("allCnt:", allCnt)
- atomic.StoreInt32(&allCnt, 0)
- }
- }()
- for {
- select {
- case msg := <-writeMsgChan:
- handle(msg)
- case rsg := <-readMsgChan:
- writeMsgChan <- rsg
- atomic.AddInt32(&allCnt, 1)
- }
- }
- }
|