consumer.go 716 B

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. package mqtt
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. var (
  9. bufferPool = sync.Pool{
  10. New: func() interface{} {
  11. return make([]byte, 1024) // 根据实际情况调整缓冲区大小
  12. },
  13. }
  14. readMsgChan = make(chan []byte, 2)
  15. writeMsgChan = make(chan []byte, 2)
  16. )
  17. // Consumer 处理收到的消息
  18. func (s *IMqttClient) Consumer(handle func([]byte)) {
  19. ac := time.NewTicker(2 * time.Minute)
  20. var allCnt int32 = 0
  21. go func() {
  22. for {
  23. select {
  24. case <-ac.C:
  25. fmt.Println("allCnt:", allCnt)
  26. atomic.StoreInt32(&allCnt, 0)
  27. }
  28. }
  29. }()
  30. for {
  31. select {
  32. case msg := <-writeMsgChan:
  33. handle(msg)
  34. case rsg := <-readMsgChan:
  35. writeMsgChan <- rsg
  36. atomic.AddInt32(&allCnt, 1)
  37. }
  38. }
  39. }