net.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. */
  14. package mqtt
  15. import (
  16. "errors"
  17. "io"
  18. "net"
  19. "reflect"
  20. "strings"
  21. "sync"
  22. "time"
  23. "github.com/eclipse/paho.mqtt.golang/packets"
  24. )
  25. const closedNetConnErrorText = "use of closed network connection" // error string for closed conn (https://golang.org/src/net/error_test.go)
  26. // ConnectMQTT takes a connected net.Conn and performs the initial MQTT handshake. Parameters are:
  27. // conn - Connected net.Conn
  28. // cm - Connect Packet with everything other than the protocol name/version populated (historical reasons)
  29. // protocolVersion - The protocol version to attempt to connect with
  30. //
  31. // Note that, for backward compatibility, ConnectMQTT() suppresses the actual connection error (compare to connectMQTT()).
  32. func ConnectMQTT(conn net.Conn, cm *packets.ConnectPacket, protocolVersion uint) (byte, bool) {
  33. rc, sessionPresent, _ := connectMQTT(conn, cm, protocolVersion)
  34. return rc, sessionPresent
  35. }
  36. func connectMQTT(conn io.ReadWriter, cm *packets.ConnectPacket, protocolVersion uint) (byte, bool, error) {
  37. switch protocolVersion {
  38. case 3:
  39. DEBUG.Println(CLI, "Using MQTT 3.1 protocol")
  40. cm.ProtocolName = "MQIsdp"
  41. cm.ProtocolVersion = 3
  42. case 0x83:
  43. DEBUG.Println(CLI, "Using MQTT 3.1b protocol")
  44. cm.ProtocolName = "MQIsdp"
  45. cm.ProtocolVersion = 0x83
  46. case 0x84:
  47. DEBUG.Println(CLI, "Using MQTT 3.1.1b protocol")
  48. cm.ProtocolName = "MQTT"
  49. cm.ProtocolVersion = 0x84
  50. default:
  51. DEBUG.Println(CLI, "Using MQTT 3.1.1 protocol")
  52. cm.ProtocolName = "MQTT"
  53. cm.ProtocolVersion = 4
  54. }
  55. if err := cm.Write(conn); err != nil {
  56. ERROR.Println(CLI, err)
  57. return packets.ErrNetworkError, false, err
  58. }
  59. rc, sessionPresent, err := verifyCONNACK(conn)
  60. return rc, sessionPresent, err
  61. }
  62. // This function is only used for receiving a connack
  63. // when the connection is first started.
  64. // This prevents receiving incoming data while resume
  65. // is in progress if clean session is false.
  66. func verifyCONNACK(conn io.Reader) (byte, bool, error) {
  67. DEBUG.Println(NET, "connect started")
  68. ca, err := packets.ReadPacket(conn)
  69. if err != nil {
  70. ERROR.Println(NET, "connect got error", err)
  71. return packets.ErrNetworkError, false, err
  72. }
  73. if ca == nil {
  74. ERROR.Println(NET, "received nil packet")
  75. return packets.ErrNetworkError, false, errors.New("nil CONNACK packet")
  76. }
  77. msg, ok := ca.(*packets.ConnackPacket)
  78. if !ok {
  79. ERROR.Println(NET, "received msg that was not CONNACK")
  80. return packets.ErrNetworkError, false, errors.New("non-CONNACK first packet received")
  81. }
  82. DEBUG.Println(NET, "received connack")
  83. return msg.ReturnCode, msg.SessionPresent, nil
  84. }
  85. // inbound encapsulates the output from startIncoming.
  86. // err - If != nil then an error has occurred
  87. // cp - A control packet received over the network link
  88. type inbound struct {
  89. err error
  90. cp packets.ControlPacket
  91. }
  92. // startIncoming initiates a goroutine that reads incoming messages off the wire and sends them to the channel (returned).
  93. // If there are any issues with the network connection then the returned channel will be closed and the goroutine will exit
  94. // (so closing the connection will terminate the goroutine)
  95. func startIncoming(conn io.Reader) <-chan inbound {
  96. var err error
  97. var cp packets.ControlPacket
  98. ibound := make(chan inbound)
  99. DEBUG.Println(NET, "incoming started")
  100. go func() {
  101. for {
  102. if cp, err = packets.ReadPacket(conn); err != nil {
  103. // We do not want to log the error if it is due to the network connection having been closed
  104. // elsewhere (i.e. after sending DisconnectPacket). Detecting this situation is the subject of
  105. // https://github.com/golang/go/issues/4373
  106. if !strings.Contains(err.Error(), closedNetConnErrorText) {
  107. ibound <- inbound{err: err}
  108. }
  109. close(ibound)
  110. DEBUG.Println(NET, "incoming complete")
  111. return
  112. }
  113. DEBUG.Println(NET, "startIncoming Received Message")
  114. ibound <- inbound{cp: cp}
  115. }
  116. }()
  117. return ibound
  118. }
  119. // incomingComms encapsulates the possible output of the incomingComms routine. If err != nil then an error has occurred and
  120. // the routine will have terminated; otherwise one of the other members should be non-nil
  121. type incomingComms struct {
  122. err error // If non-nil then there has been an error (ignore everything else)
  123. outbound *PacketAndToken // Packet (with token) than needs to be sent out (e.g. an acknowledgement)
  124. incomingPub *packets.PublishPacket // A new publish has been received; this will need to be passed on to our user
  125. }
  126. // startIncomingComms initiates incoming communications; this includes starting a goroutine to process incoming
  127. // messages.
  128. // Accepts a channel of inbound messages from the store (persisted messages); note this must be closed as soon as the
  129. // everything in the store has been sent.
  130. // Returns a channel that will be passed any received packets; this will be closed on a network error (and inboundFromStore closed)
  131. func startIncomingComms(conn io.Reader,
  132. c commsFns,
  133. inboundFromStore <-chan packets.ControlPacket,
  134. ) <-chan incomingComms {
  135. ibound := startIncoming(conn) // Start goroutine that reads from network connection
  136. output := make(chan incomingComms)
  137. DEBUG.Println(NET, "startIncomingComms started")
  138. go func() {
  139. for {
  140. if inboundFromStore == nil && ibound == nil {
  141. close(output)
  142. DEBUG.Println(NET, "startIncomingComms goroutine complete")
  143. return // As soon as ibound is closed we can exit (should have already processed an error)
  144. }
  145. DEBUG.Println(NET, "logic waiting for msg on ibound")
  146. var msg packets.ControlPacket
  147. var ok bool
  148. select {
  149. case msg, ok = <-inboundFromStore:
  150. if !ok {
  151. DEBUG.Println(NET, "startIncomingComms: inboundFromStore complete")
  152. inboundFromStore = nil // should happen quickly as this is only for persisted messages
  153. continue
  154. }
  155. DEBUG.Println(NET, "startIncomingComms: got msg from store")
  156. case ibMsg, ok := <-ibound:
  157. if !ok {
  158. DEBUG.Println(NET, "startIncomingComms: ibound complete")
  159. ibound = nil
  160. continue
  161. }
  162. DEBUG.Println(NET, "startIncomingComms: got msg on ibound")
  163. // If the inbound comms routine encounters any issues it will send us an error.
  164. if ibMsg.err != nil {
  165. output <- incomingComms{err: ibMsg.err}
  166. continue // Usually the channel will be closed immediately after sending an error but safer that we do not assume this
  167. }
  168. msg = ibMsg.cp
  169. c.persistInbound(msg)
  170. c.UpdateLastReceived() // Notify keepalive logic that we recently received a packet
  171. }
  172. switch m := msg.(type) {
  173. case *packets.PingrespPacket:
  174. DEBUG.Println(NET, "startIncomingComms: received pingresp")
  175. c.pingRespReceived()
  176. case *packets.SubackPacket:
  177. DEBUG.Println(NET, "startIncomingComms: received suback, id:", m.MessageID)
  178. token := c.getToken(m.MessageID)
  179. if t, ok := token.(*SubscribeToken); ok {
  180. DEBUG.Println(NET, "startIncomingComms: granted qoss", m.ReturnCodes)
  181. for i, qos := range m.ReturnCodes {
  182. t.subResult[t.subs[i]] = qos
  183. }
  184. }
  185. token.flowComplete()
  186. c.freeID(m.MessageID)
  187. case *packets.UnsubackPacket:
  188. DEBUG.Println(NET, "startIncomingComms: received unsuback, id:", m.MessageID)
  189. c.getToken(m.MessageID).flowComplete()
  190. c.freeID(m.MessageID)
  191. case *packets.PublishPacket:
  192. DEBUG.Println(NET, "startIncomingComms: received publish, msgId:", m.MessageID)
  193. output <- incomingComms{incomingPub: m}
  194. case *packets.PubackPacket:
  195. DEBUG.Println(NET, "startIncomingComms: received puback, id:", m.MessageID)
  196. c.getToken(m.MessageID).flowComplete()
  197. c.freeID(m.MessageID)
  198. case *packets.PubrecPacket:
  199. DEBUG.Println(NET, "startIncomingComms: received pubrec, id:", m.MessageID)
  200. prel := packets.NewControlPacket(packets.Pubrel).(*packets.PubrelPacket)
  201. prel.MessageID = m.MessageID
  202. output <- incomingComms{outbound: &PacketAndToken{p: prel, t: nil}}
  203. case *packets.PubrelPacket:
  204. DEBUG.Println(NET, "startIncomingComms: received pubrel, id:", m.MessageID)
  205. pc := packets.NewControlPacket(packets.Pubcomp).(*packets.PubcompPacket)
  206. pc.MessageID = m.MessageID
  207. c.persistOutbound(pc)
  208. output <- incomingComms{outbound: &PacketAndToken{p: pc, t: nil}}
  209. case *packets.PubcompPacket:
  210. DEBUG.Println(NET, "startIncomingComms: received pubcomp, id:", m.MessageID)
  211. c.getToken(m.MessageID).flowComplete()
  212. c.freeID(m.MessageID)
  213. }
  214. }
  215. }()
  216. return output
  217. }
  218. // startOutgoingComms initiates a go routine to transmit outgoing packets.
  219. // Pass in an open network connection and channels for outbound messages (including those triggered
  220. // directly from incoming comms).
  221. // Returns a channel that will receive details of any errors (closed when the goroutine exits)
  222. // This function wil only terminate when all input channels are closed
  223. func startOutgoingComms(conn net.Conn,
  224. c commsFns,
  225. oboundp <-chan *PacketAndToken,
  226. obound <-chan *PacketAndToken,
  227. oboundFromIncoming <-chan *PacketAndToken,
  228. ) <-chan error {
  229. errChan := make(chan error)
  230. DEBUG.Println(NET, "outgoing started")
  231. go func() {
  232. for {
  233. DEBUG.Println(NET, "outgoing waiting for an outbound message")
  234. // This goroutine will only exits when all of the input channels we receive on have been closed. This approach is taken to avoid any
  235. // deadlocks (if the connection goes down there are limited options as to what we can do with anything waiting on us and
  236. // throwing away the packets seems the best option)
  237. if oboundp == nil && obound == nil && oboundFromIncoming == nil {
  238. DEBUG.Println(NET, "outgoing comms stopping")
  239. close(errChan)
  240. return
  241. }
  242. select {
  243. case pub, ok := <-obound:
  244. if !ok {
  245. obound = nil
  246. continue
  247. }
  248. msg := pub.p.(*packets.PublishPacket)
  249. DEBUG.Println(NET, "obound msg to write", msg.MessageID)
  250. writeTimeout := c.getWriteTimeOut()
  251. if writeTimeout > 0 {
  252. if err := conn.SetWriteDeadline(time.Now().Add(writeTimeout)); err != nil {
  253. ERROR.Println(NET, "SetWriteDeadline ", err)
  254. }
  255. }
  256. if err := msg.Write(conn); err != nil {
  257. ERROR.Println(NET, "outgoing obound reporting error ", err)
  258. pub.t.setError(err)
  259. // report error if it's not due to the connection being closed elsewhere
  260. if !strings.Contains(err.Error(), closedNetConnErrorText) {
  261. errChan <- err
  262. }
  263. continue
  264. }
  265. if writeTimeout > 0 {
  266. // If we successfully wrote, we don't want the timeout to happen during an idle period
  267. // so we reset it to infinite.
  268. if err := conn.SetWriteDeadline(time.Time{}); err != nil {
  269. ERROR.Println(NET, "SetWriteDeadline to 0 ", err)
  270. }
  271. }
  272. if msg.Qos == 0 {
  273. pub.t.flowComplete()
  274. }
  275. DEBUG.Println(NET, "obound wrote msg, id:", msg.MessageID)
  276. case msg, ok := <-oboundp:
  277. if !ok {
  278. oboundp = nil
  279. continue
  280. }
  281. DEBUG.Println(NET, "obound priority msg to write, type", reflect.TypeOf(msg.p))
  282. if err := msg.p.Write(conn); err != nil {
  283. ERROR.Println(NET, "outgoing oboundp reporting error ", err)
  284. if msg.t != nil {
  285. msg.t.setError(err)
  286. }
  287. errChan <- err
  288. continue
  289. }
  290. if _, ok := msg.p.(*packets.DisconnectPacket); ok {
  291. msg.t.(*DisconnectToken).flowComplete()
  292. DEBUG.Println(NET, "outbound wrote disconnect, closing connection")
  293. // As per the MQTT spec "After sending a DISCONNECT Packet the Client MUST close the Network Connection"
  294. // Closing the connection will cause the goroutines to end in sequence (starting with incoming comms)
  295. conn.Close()
  296. }
  297. case msg, ok := <-oboundFromIncoming: // message triggered by an inbound message (PubrecPacket or PubrelPacket)
  298. if !ok {
  299. oboundFromIncoming = nil
  300. continue
  301. }
  302. DEBUG.Println(NET, "obound from incoming msg to write, type", reflect.TypeOf(msg.p), " ID ", msg.p.Details().MessageID)
  303. if err := msg.p.Write(conn); err != nil {
  304. ERROR.Println(NET, "outgoing oboundFromIncoming reporting error", err)
  305. if msg.t != nil {
  306. msg.t.setError(err)
  307. }
  308. errChan <- err
  309. continue
  310. }
  311. }
  312. c.UpdateLastSent() // Record that a packet has been received (for keepalive routine)
  313. }
  314. }()
  315. return errChan
  316. }
  317. // commsFns provide access to the client state (messageids, requesting disconnection and updating timing)
  318. type commsFns interface {
  319. getToken(id uint16) tokenCompletor // Retrieve the token for the specified messageid (if none then a dummy token must be returned)
  320. freeID(id uint16) // Release the specified messageid (clearing out of any persistent store)
  321. UpdateLastReceived() // Must be called whenever a packet is received
  322. UpdateLastSent() // Must be called whenever a packet is successfully sent
  323. getWriteTimeOut() time.Duration // Return the writetimeout (or 0 if none)
  324. persistOutbound(m packets.ControlPacket) // add the packet to the outbound store
  325. persistInbound(m packets.ControlPacket) // add the packet to the inbound store
  326. pingRespReceived() // Called when a ping response is received
  327. }
  328. // startComms initiates goroutines that handles communications over the network connection
  329. // Messages will be stored (via commsFns) and deleted from the store as necessary
  330. // It returns two channels:
  331. // packets.PublishPacket - Will receive publish packets received over the network.
  332. // Closed when incoming comms routines exit (on shutdown or if network link closed)
  333. // error - Any errors will be sent on this channel. The channel is closed when all comms routines have shut down
  334. //
  335. // Note: The comms routines monitoring oboundp and obound will not shutdown until those channels are both closed. Any messages received between the
  336. // connection being closed and those channels being closed will generate errors (and nothing will be sent). That way the chance of a deadlock is
  337. // minimised.
  338. func startComms(conn net.Conn, // Network connection (must be active)
  339. c commsFns, // getters and setters to enable us to cleanly interact with client
  340. inboundFromStore <-chan packets.ControlPacket, // Inbound packets from the persistence store (should be closed relatively soon after startup)
  341. oboundp <-chan *PacketAndToken,
  342. obound <-chan *PacketAndToken) (
  343. <-chan *packets.PublishPacket, // Publishpackages received over the network
  344. <-chan error, // Any errors (should generally trigger a disconnect)
  345. ) {
  346. // Start inbound comms handler; this needs to be able to transmit messages so we start a go routine to add these to the priority outbound channel
  347. ibound := startIncomingComms(conn, c, inboundFromStore)
  348. outboundFromIncoming := make(chan *PacketAndToken) // Will accept outgoing messages triggered by startIncomingComms (e.g. acknowledgements)
  349. // Start the outgoing handler. It is important to note that output from startIncomingComms is fed into startOutgoingComms (for ACK's)
  350. oboundErr := startOutgoingComms(conn, c, oboundp, obound, outboundFromIncoming)
  351. DEBUG.Println(NET, "startComms started")
  352. // Run up go routines to handle the output from the above comms functions - these are handled in separate
  353. // go routines because they can interact (e.g. ibound triggers an ACK to obound which triggers an error)
  354. var wg sync.WaitGroup
  355. wg.Add(2)
  356. outPublish := make(chan *packets.PublishPacket)
  357. outError := make(chan error)
  358. // Any messages received get passed to the appropriate channel
  359. go func() {
  360. for ic := range ibound {
  361. if ic.err != nil {
  362. outError <- ic.err
  363. continue
  364. }
  365. if ic.outbound != nil {
  366. outboundFromIncoming <- ic.outbound
  367. continue
  368. }
  369. if ic.incomingPub != nil {
  370. outPublish <- ic.incomingPub
  371. continue
  372. }
  373. ERROR.Println(STR, "startComms received empty incomingComms msg")
  374. }
  375. // Close channels that will not be written to again (allowing other routines to exit)
  376. close(outboundFromIncoming)
  377. close(outPublish)
  378. wg.Done()
  379. }()
  380. // Any errors will be passed out to our caller
  381. go func() {
  382. for err := range oboundErr {
  383. outError <- err
  384. }
  385. wg.Done()
  386. }()
  387. // outError is used by both routines so can only be closed when they are both complete
  388. go func() {
  389. wg.Wait()
  390. close(outError)
  391. DEBUG.Println(NET, "startComms closing outError")
  392. }()
  393. return outPublish, outError
  394. }
  395. // ackFunc acknowledges a packet
  396. // WARNING the function returned must not be called if the comms routine is shutting down or not running
  397. // (it needs outgoing comms in order to send the acknowledgement). Currently this is only called from
  398. // matchAndDispatch which will be shutdown before the comms are
  399. func ackFunc(oboundP chan *PacketAndToken, persist Store, packet *packets.PublishPacket) func() {
  400. return func() {
  401. switch packet.Qos {
  402. case 2:
  403. pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
  404. pr.MessageID = packet.MessageID
  405. DEBUG.Println(NET, "putting pubrec msg on obound")
  406. oboundP <- &PacketAndToken{p: pr, t: nil}
  407. DEBUG.Println(NET, "done putting pubrec msg on obound")
  408. case 1:
  409. pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
  410. pa.MessageID = packet.MessageID
  411. DEBUG.Println(NET, "putting puback msg on obound")
  412. persistOutbound(persist, pa)
  413. oboundP <- &PacketAndToken{p: pa, t: nil}
  414. DEBUG.Println(NET, "done putting puback msg on obound")
  415. case 0:
  416. // do nothing, since there is no need to send an ack packet back
  417. }
  418. }
  419. }