consumer.go 885 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package mqtt2
  2. import (
  3. "os"
  4. "os/signal"
  5. "sync"
  6. "syscall"
  7. "time"
  8. )
  9. var (
  10. bufferPool = sync.Pool{
  11. New: func() interface{} {
  12. return make([]byte, 1024) // 根据实际情况调整缓冲区大小
  13. },
  14. }
  15. readMsgChan = make(chan []byte, 2)
  16. writeMsgChan = make(chan []byte, 2)
  17. )
  18. // Consumer 处理收到的消息
  19. func (s *MqttClient) Consumer(fun func([]byte)) {
  20. go func() {
  21. for {
  22. select {
  23. case msg := <-writeMsgChan:
  24. fun(msg)
  25. }
  26. }
  27. }()
  28. sc := make(chan os.Signal, 1)
  29. signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  30. // 设置2分钟超时
  31. tc := time.After(2 * time.Minute)
  32. for {
  33. select {
  34. case rsg := <-readMsgChan:
  35. writeMsgChan <- rsg
  36. case <-sc:
  37. close(writeMsgChan)
  38. close(readMsgChan)
  39. s.Close()
  40. case <-tc:
  41. close(writeMsgChan)
  42. close(readMsgChan)
  43. s.Close()
  44. }
  45. }
  46. }