package mqtt2 import ( "os" "os/signal" "sync" "syscall" "time" ) var ( bufferPool = sync.Pool{ New: func() interface{} { return make([]byte, 1024) // 根据实际情况调整缓冲区大小 }, } readMsgChan = make(chan []byte, 2) writeMsgChan = make(chan []byte, 2) ) // Consumer 处理收到的消息 func (s *MqttClient) Consumer(fun func([]byte)) { go func() { for { select { case msg := <-writeMsgChan: fun(msg) } } }() sc := make(chan os.Signal, 1) signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) // 设置2分钟超时 tc := time.After(2 * time.Minute) for { select { case rsg := <-readMsgChan: writeMsgChan <- rsg case <-sc: close(writeMsgChan) close(readMsgChan) s.Close() case <-tc: close(writeMsgChan) close(readMsgChan) s.Close() } } }