consumer.go 714 B

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. package mqtt
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. var (
  9. bufferPool = sync.Pool{
  10. New: func() interface{} {
  11. return make([]byte, 1024*50) // 根据实际情况调整缓冲区大小
  12. },
  13. }
  14. readMsgChan = make(chan []byte, 5)
  15. writeMsgChan = make(chan []byte, 5)
  16. )
  17. // Consumer 处理收到的消息
  18. func (s *IMqttClient) Consumer(handle func([]byte)) {
  19. tc := time.NewTicker(2 * time.Minute)
  20. defer tc.Stop()
  21. var allCnt int32 = 0
  22. go func() {
  23. for range tc.C {
  24. fmt.Println("allCnt:", allCnt)
  25. atomic.StoreInt32(&allCnt, 0)
  26. }
  27. }()
  28. for {
  29. select {
  30. case msg := <-writeMsgChan:
  31. handle(msg)
  32. case rsg := <-readMsgChan:
  33. writeMsgChan <- rsg
  34. atomic.AddInt32(&allCnt, 1)
  35. }
  36. }
  37. }