client.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. */
  14. // Portions copyright © 2018 TIBCO Software Inc.
  15. // Package mqtt provides an MQTT v3.1.1 client library.
  16. package mqtt
  17. import (
  18. "bytes"
  19. "errors"
  20. "fmt"
  21. "net"
  22. "strings"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/eclipse/paho.mqtt.golang/packets"
  27. )
  28. const (
  29. disconnected uint32 = iota
  30. connecting
  31. reconnecting
  32. connected
  33. )
  34. // Client is the interface definition for a Client as used by this
  35. // library, the interface is primarily to allow mocking tests.
  36. //
  37. // It is an MQTT v3.1.1 client for communicating
  38. // with an MQTT server using non-blocking methods that allow work
  39. // to be done in the background.
  40. // An application may connect to an MQTT server using:
  41. // A plain TCP socket
  42. // A secure SSL/TLS socket
  43. // A websocket
  44. // To enable ensured message delivery at Quality of Service (QoS) levels
  45. // described in the MQTT spec, a message persistence mechanism must be
  46. // used. This is done by providing a type which implements the Store
  47. // interface. For convenience, FileStore and MemoryStore are provided
  48. // implementations that should be sufficient for most use cases. More
  49. // information can be found in their respective documentation.
  50. // Numerous connection options may be specified by configuring a
  51. // and then supplying a ClientOptions type.
  52. // Implementations of Client must be safe for concurrent use by multiple
  53. // goroutines
  54. type Client interface {
  55. // IsConnected returns a bool signifying whether
  56. // the client is connected or not.
  57. IsConnected() bool
  58. // IsConnectionOpen return a bool signifying whether the client has an active
  59. // connection to mqtt broker, i.e not in disconnected or reconnect mode
  60. IsConnectionOpen() bool
  61. // Connect will create a connection to the message broker, by default
  62. // it will attempt to connect at v3.1.1 and auto retry at v3.1 if that
  63. // fails
  64. Connect() Token
  65. // Disconnect will end the connection with the server, but not before waiting
  66. // the specified number of milliseconds to wait for existing work to be
  67. // completed.
  68. Disconnect(quiesce uint)
  69. // Publish will publish a message with the specified QoS and content
  70. // to the specified topic.
  71. // Returns a token to track delivery of the message to the broker
  72. Publish(topic string, qos byte, retained bool, payload interface{}) Token
  73. // Subscribe starts a new subscription. Provide a MessageHandler to be executed when
  74. // a message is published on the topic provided, or nil for the default handler.
  75. //
  76. // If options.OrderMatters is true (the default) then callback must not block or
  77. // call functions within this package that may block (e.g. Publish) other than in
  78. // a new go routine.
  79. // callback must be safe for concurrent use by multiple goroutines.
  80. Subscribe(topic string, qos byte, callback MessageHandler) Token
  81. // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
  82. // be executed when a message is published on one of the topics provided, or nil for the
  83. // default handler.
  84. //
  85. // If options.OrderMatters is true (the default) then callback must not block or
  86. // call functions within this package that may block (e.g. Publish) other than in
  87. // a new go routine.
  88. // callback must be safe for concurrent use by multiple goroutines.
  89. SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token
  90. // Unsubscribe will end the subscription from each of the topics provided.
  91. // Messages published to those topics from other clients will no longer be
  92. // received.
  93. Unsubscribe(topics ...string) Token
  94. // AddRoute allows you to add a handler for messages on a specific topic
  95. // without making a subscription. For example having a different handler
  96. // for parts of a wildcard subscription or for receiving retained messages
  97. // upon connection (before Sub scribe can be processed).
  98. //
  99. // If options.OrderMatters is true (the default) then callback must not block or
  100. // call functions within this package that may block (e.g. Publish) other than in
  101. // a new go routine.
  102. // callback must be safe for concurrent use by multiple goroutines.
  103. AddRoute(topic string, callback MessageHandler)
  104. // OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
  105. // in use by the client.
  106. OptionsReader() ClientOptionsReader
  107. }
  108. // client implements the Client interface
  109. // clients are safe for concurrent use by multiple
  110. // goroutines
  111. type client struct {
  112. lastSent atomic.Value // time.Time - the last time a packet was successfully sent to network
  113. lastReceived atomic.Value // time.Time - the last time a packet was successfully received from network
  114. pingOutstanding int32 // set to 1 if a ping has been sent but response not ret received
  115. status uint32 // see const definitions at top of file for possible values
  116. sync.RWMutex // Protects the above two variables (note: atomic writes are also used somewhat inconsistently)
  117. messageIds // effectively a map from message id to token completor
  118. obound chan *PacketAndToken // outgoing publish packet
  119. oboundP chan *PacketAndToken // outgoing 'priority' packet (anything other than publish)
  120. msgRouter *router // routes topics to handlers
  121. persist Store
  122. options ClientOptions
  123. optionsMu sync.Mutex // Protects the options in a few limited cases where needed for testing
  124. conn net.Conn // the network connection, must only be set with connMu locked (only used when starting/stopping workers)
  125. connMu sync.Mutex // mutex for the connection (again only used in two functions)
  126. stop chan struct{} // Closed to request that workers stop
  127. workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
  128. commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)
  129. }
  130. // NewClient will create an MQTT v3.1.1 client with all of the options specified
  131. // in the provided ClientOptions. The client must have the Connect method called
  132. // on it before it may be used. This is to make sure resources (such as a net
  133. // connection) are created before the application is actually ready.
  134. func NewClient(o *ClientOptions) Client {
  135. c := &client{}
  136. c.options = *o
  137. if c.options.Store == nil {
  138. c.options.Store = NewMemoryStore()
  139. }
  140. switch c.options.ProtocolVersion {
  141. case 3, 4:
  142. c.options.protocolVersionExplicit = true
  143. case 0x83, 0x84:
  144. c.options.protocolVersionExplicit = true
  145. default:
  146. c.options.ProtocolVersion = 4
  147. c.options.protocolVersionExplicit = false
  148. }
  149. c.persist = c.options.Store
  150. c.status = disconnected
  151. c.messageIds = messageIds{index: make(map[uint16]tokenCompletor)}
  152. c.msgRouter = newRouter()
  153. c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
  154. c.obound = make(chan *PacketAndToken)
  155. c.oboundP = make(chan *PacketAndToken)
  156. return c
  157. }
  158. // AddRoute allows you to add a handler for messages on a specific topic
  159. // without making a subscription. For example having a different handler
  160. // for parts of a wildcard subscription
  161. //
  162. // If options.OrderMatters is true (the default) then callback must not block or
  163. // call functions within this package that may block (e.g. Publish) other than in
  164. // a new go routine.
  165. // callback must be safe for concurrent use by multiple goroutines.
  166. func (c *client) AddRoute(topic string, callback MessageHandler) {
  167. if callback != nil {
  168. c.msgRouter.addRoute(topic, callback)
  169. }
  170. }
  171. // IsConnected returns a bool signifying whether
  172. // the client is connected or not.
  173. // connected means that the connection is up now OR it will
  174. // be established/reestablished automatically when possible
  175. func (c *client) IsConnected() bool {
  176. c.RLock()
  177. defer c.RUnlock()
  178. status := atomic.LoadUint32(&c.status)
  179. switch {
  180. case status == connected:
  181. return true
  182. case c.options.AutoReconnect && status > connecting:
  183. return true
  184. case c.options.ConnectRetry && status == connecting:
  185. return true
  186. default:
  187. return false
  188. }
  189. }
  190. // IsConnectionOpen return a bool signifying whether the client has an active
  191. // connection to mqtt broker, i.e not in disconnected or reconnect mode
  192. func (c *client) IsConnectionOpen() bool {
  193. c.RLock()
  194. defer c.RUnlock()
  195. status := atomic.LoadUint32(&c.status)
  196. switch {
  197. case status == connected:
  198. return true
  199. default:
  200. return false
  201. }
  202. }
  203. func (c *client) connectionStatus() uint32 {
  204. c.RLock()
  205. defer c.RUnlock()
  206. status := atomic.LoadUint32(&c.status)
  207. return status
  208. }
  209. func (c *client) setConnected(status uint32) {
  210. c.Lock()
  211. defer c.Unlock()
  212. atomic.StoreUint32(&c.status, status)
  213. }
  214. // ErrNotConnected is the error returned from function calls that are
  215. // made when the client is not connected to a broker
  216. var ErrNotConnected = errors.New("not Connected")
  217. // Connect will create a connection to the message broker, by default
  218. // it will attempt to connect at v3.1.1 and auto retry at v3.1 if that
  219. // fails
  220. // Note: If using QOS1+ and CleanSession=false it is advisable to add
  221. // routes (or a DefaultPublishHandler) prior to calling Connect()
  222. // because queued messages may be delivered immediately post connection
  223. func (c *client) Connect() Token {
  224. t := newToken(packets.Connect).(*ConnectToken)
  225. DEBUG.Println(CLI, "Connect()")
  226. if c.options.ConnectRetry && atomic.LoadUint32(&c.status) != disconnected {
  227. // if in any state other than disconnected and ConnectRetry is
  228. // enabled then the connection will come up automatically
  229. // client can assume connection is up
  230. WARN.Println(CLI, "Connect() called but not disconnected")
  231. t.returnCode = packets.Accepted
  232. t.flowComplete()
  233. return t
  234. }
  235. c.persist.Open()
  236. if c.options.ConnectRetry {
  237. c.reserveStoredPublishIDs() // Reserve IDs to allow publish before connect complete
  238. }
  239. c.setConnected(connecting)
  240. go func() {
  241. if len(c.options.Servers) == 0 {
  242. t.setError(fmt.Errorf("no servers defined to connect to"))
  243. return
  244. }
  245. RETRYCONN:
  246. var conn net.Conn
  247. var rc byte
  248. var err error
  249. conn, rc, t.sessionPresent, err = c.attemptConnection()
  250. if err != nil {
  251. if c.options.ConnectRetry {
  252. DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry")
  253. time.Sleep(c.options.ConnectRetryInterval)
  254. if atomic.LoadUint32(&c.status) == connecting {
  255. goto RETRYCONN
  256. }
  257. }
  258. ERROR.Println(CLI, "Failed to connect to a broker")
  259. c.setConnected(disconnected)
  260. c.persist.Close()
  261. t.returnCode = rc
  262. t.setError(err)
  263. return
  264. }
  265. inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
  266. if c.startCommsWorkers(conn, inboundFromStore) {
  267. // Take care of any messages in the store
  268. if !c.options.CleanSession {
  269. c.resume(c.options.ResumeSubs, inboundFromStore)
  270. } else {
  271. c.persist.Reset()
  272. }
  273. } else {
  274. WARN.Println(CLI, "Connect() called but connection established in another goroutine")
  275. }
  276. close(inboundFromStore)
  277. t.flowComplete()
  278. DEBUG.Println(CLI, "exit startClient")
  279. }()
  280. return t
  281. }
  282. // internal function used to reconnect the client when it loses its connection
  283. func (c *client) reconnect() {
  284. DEBUG.Println(CLI, "enter reconnect")
  285. var (
  286. sleep = 1 * time.Second
  287. conn net.Conn
  288. )
  289. for {
  290. if nil != c.options.OnReconnecting {
  291. c.options.OnReconnecting(c, &c.options)
  292. }
  293. var err error
  294. conn, _, _, err = c.attemptConnection()
  295. if err == nil {
  296. break
  297. }
  298. DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds:", err)
  299. time.Sleep(sleep)
  300. if sleep < c.options.MaxReconnectInterval {
  301. sleep *= 2
  302. }
  303. if sleep > c.options.MaxReconnectInterval {
  304. sleep = c.options.MaxReconnectInterval
  305. }
  306. // Disconnect may have been called
  307. if atomic.LoadUint32(&c.status) == disconnected {
  308. break
  309. }
  310. }
  311. // Disconnect() must have been called while we were trying to reconnect.
  312. if c.connectionStatus() == disconnected {
  313. if conn != nil {
  314. conn.Close()
  315. }
  316. DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect")
  317. return
  318. }
  319. inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
  320. if c.startCommsWorkers(conn, inboundFromStore) {
  321. c.resume(c.options.ResumeSubs, inboundFromStore)
  322. }
  323. close(inboundFromStore)
  324. }
  325. // attemptConnection makes a single attempt to connect to each of the brokers
  326. // the protocol version to use is passed in (as c.options.ProtocolVersion)
  327. // Note: Does not set c.conn in order to minimise race conditions
  328. // Returns:
  329. // net.Conn - Connected network connection
  330. // byte - Return code (packets.Accepted indicates a successful connection).
  331. // bool - SessionPresent flag from the connect ack (only valid if packets.Accepted)
  332. // err - Error (err != nil guarantees that conn has been set to active connection).
  333. func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
  334. protocolVersion := c.options.ProtocolVersion
  335. var (
  336. sessionPresent bool
  337. conn net.Conn
  338. err error
  339. rc byte
  340. )
  341. c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
  342. brokers := c.options.Servers
  343. c.optionsMu.Unlock()
  344. for _, broker := range brokers {
  345. cm := newConnectMsgFromOptions(&c.options, broker)
  346. DEBUG.Println(CLI, "about to write new connect msg")
  347. CONN:
  348. tlsCfg := c.options.TLSConfig
  349. if c.options.OnConnectAttempt != nil {
  350. DEBUG.Println(CLI, "using custom onConnectAttempt handler...")
  351. tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
  352. }
  353. // Start by opening the network connection (tcp, tls, ws) etc
  354. conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions)
  355. if err != nil {
  356. ERROR.Println(CLI, err.Error())
  357. WARN.Println(CLI, "failed to connect to broker, trying next")
  358. rc = packets.ErrNetworkError
  359. continue
  360. }
  361. DEBUG.Println(CLI, "socket connected to broker")
  362. // Now we send the perform the MQTT connection handshake
  363. rc, sessionPresent, err = connectMQTT(conn, cm, protocolVersion)
  364. if rc == packets.Accepted {
  365. break // successfully connected
  366. }
  367. // We may be have to attempt the connection with MQTT 3.1
  368. if conn != nil {
  369. _ = conn.Close()
  370. }
  371. if !c.options.protocolVersionExplicit && protocolVersion == 4 { // try falling back to 3.1?
  372. DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol")
  373. protocolVersion = 3
  374. goto CONN
  375. }
  376. if c.options.protocolVersionExplicit { // to maintain logging from previous version
  377. ERROR.Println(CLI, "Connecting to", broker, "CONNACK was not CONN_ACCEPTED, but rather", packets.ConnackReturnCodes[rc])
  378. }
  379. }
  380. // If the connection was successful we set member variable and lock in the protocol version for future connection attempts (and users)
  381. if rc == packets.Accepted {
  382. c.options.ProtocolVersion = protocolVersion
  383. c.options.protocolVersionExplicit = true
  384. } else {
  385. // Maintain same error format as used previously
  386. if rc != packets.ErrNetworkError { // mqtt error
  387. err = packets.ConnErrors[rc]
  388. } else { // network error (if this occurred in ConnectMQTT then err will be nil)
  389. err = fmt.Errorf("%s : %s", packets.ConnErrors[rc], err)
  390. }
  391. }
  392. return conn, rc, sessionPresent, err
  393. }
  394. // Disconnect will end the connection with the server, but not before waiting
  395. // the specified number of milliseconds to wait for existing work to be
  396. // completed.
  397. func (c *client) Disconnect(quiesce uint) {
  398. status := atomic.LoadUint32(&c.status)
  399. if status == connected {
  400. DEBUG.Println(CLI, "disconnecting")
  401. c.setConnected(disconnected)
  402. dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
  403. dt := newToken(packets.Disconnect)
  404. disconnectSent := false
  405. select {
  406. case c.oboundP <- &PacketAndToken{p: dm, t: dt}:
  407. disconnectSent = true
  408. case <-c.commsStopped:
  409. WARN.Println("Disconnect packet could not be sent because comms stopped")
  410. case <-time.After(time.Duration(quiesce) * time.Millisecond):
  411. WARN.Println("Disconnect packet not sent due to timeout")
  412. }
  413. // wait for work to finish, or quiesce time consumed
  414. if disconnectSent {
  415. DEBUG.Println(CLI, "calling WaitTimeout")
  416. dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
  417. DEBUG.Println(CLI, "WaitTimeout done")
  418. }
  419. } else {
  420. WARN.Println(CLI, "Disconnect() called but not connected (disconnected/reconnecting)")
  421. c.setConnected(disconnected)
  422. }
  423. c.disconnect()
  424. }
  425. // forceDisconnect will end the connection with the mqtt broker immediately (used for tests only)
  426. func (c *client) forceDisconnect() {
  427. if !c.IsConnected() {
  428. WARN.Println(CLI, "already disconnected")
  429. return
  430. }
  431. c.setConnected(disconnected)
  432. DEBUG.Println(CLI, "forcefully disconnecting")
  433. c.disconnect()
  434. }
  435. // disconnect cleans up after a final disconnection (user requested so no auto reconnection)
  436. func (c *client) disconnect() {
  437. done := c.stopCommsWorkers()
  438. if done != nil {
  439. <-done // Wait until the disconnect is complete (to limit chance that another connection will be started)
  440. DEBUG.Println(CLI, "forcefully disconnecting")
  441. c.messageIds.cleanUp()
  442. DEBUG.Println(CLI, "disconnected")
  443. c.persist.Close()
  444. }
  445. }
  446. // internalConnLost cleanup when connection is lost or an error occurs
  447. // Note: This function will not block
  448. func (c *client) internalConnLost(err error) {
  449. // It is possible that internalConnLost will be called multiple times simultaneously
  450. // (including after sending a DisconnectPacket) as such we only do cleanup etc if the
  451. // routines were actually running and are not being disconnected at users request
  452. DEBUG.Println(CLI, "internalConnLost called")
  453. stopDone := c.stopCommsWorkers()
  454. if stopDone != nil { // stopDone will be nil if workers already in the process of stopping or stopped
  455. go func() {
  456. DEBUG.Println(CLI, "internalConnLost waiting on workers")
  457. <-stopDone
  458. DEBUG.Println(CLI, "internalConnLost workers stopped")
  459. // It is possible that Disconnect was called which led to this error so reconnection depends upon status
  460. reconnect := c.options.AutoReconnect && c.connectionStatus() > connecting
  461. if c.options.CleanSession && !reconnect {
  462. c.messageIds.cleanUp()
  463. }
  464. if reconnect {
  465. c.setConnected(reconnecting)
  466. go c.reconnect()
  467. } else {
  468. c.setConnected(disconnected)
  469. }
  470. if c.options.OnConnectionLost != nil {
  471. go c.options.OnConnectionLost(c, err)
  472. }
  473. DEBUG.Println(CLI, "internalConnLost complete")
  474. }()
  475. }
  476. }
  477. // startCommsWorkers is called when the connection is up.
  478. // It starts off all of the routines needed to process incoming and outgoing messages.
  479. // Returns true if the comms workers were started (i.e. they were not already running)
  480. func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool {
  481. DEBUG.Println(CLI, "startCommsWorkers called")
  482. c.connMu.Lock()
  483. defer c.connMu.Unlock()
  484. if c.conn != nil {
  485. WARN.Println(CLI, "startCommsWorkers called when commsworkers already running")
  486. conn.Close() // No use for the new network connection
  487. return false
  488. }
  489. c.conn = conn // Store the connection
  490. c.stop = make(chan struct{})
  491. if c.options.KeepAlive != 0 {
  492. atomic.StoreInt32(&c.pingOutstanding, 0)
  493. c.lastReceived.Store(time.Now())
  494. c.lastSent.Store(time.Now())
  495. c.workers.Add(1)
  496. go keepalive(c, conn)
  497. }
  498. // matchAndDispatch will process messages received from the network. It may generate acknowledgements
  499. // It will complete when incomingPubChan is closed and will close ackOut prior to exiting
  500. incomingPubChan := make(chan *packets.PublishPacket)
  501. c.workers.Add(1) // Done will be called when ackOut is closed
  502. ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)
  503. c.setConnected(connected)
  504. DEBUG.Println(CLI, "client is connected/reconnected")
  505. if c.options.OnConnect != nil {
  506. go c.options.OnConnect(c)
  507. }
  508. // c.oboundP and c.obound need to stay active for the life of the client because, depending upon the options,
  509. // messages may be published while the client is disconnected (they will block unless in a goroutine). However
  510. // to keep the comms routines clean we want to shutdown the input messages it uses so create out own channels
  511. // and copy data across.
  512. commsobound := make(chan *PacketAndToken) // outgoing publish packets
  513. commsoboundP := make(chan *PacketAndToken) // outgoing 'priority' packet
  514. c.workers.Add(1)
  515. go func() {
  516. defer c.workers.Done()
  517. for {
  518. select {
  519. case msg := <-c.oboundP:
  520. commsoboundP <- msg
  521. case msg := <-c.obound:
  522. commsobound <- msg
  523. case msg, ok := <-ackOut:
  524. if !ok {
  525. ackOut = nil // ignore channel going forward
  526. c.workers.Done() // matchAndDispatch has completed
  527. continue // await next message
  528. }
  529. commsoboundP <- msg
  530. case <-c.stop:
  531. // Attempt to transmit any outstanding acknowledgements (this may well fail but should work if this is a clean disconnect)
  532. if ackOut != nil {
  533. for msg := range ackOut {
  534. commsoboundP <- msg
  535. }
  536. c.workers.Done() // matchAndDispatch has completed
  537. }
  538. close(commsoboundP) // Nothing sending to these channels anymore so close them and allow comms routines to exit
  539. close(commsobound)
  540. DEBUG.Println(CLI, "startCommsWorkers output redirector finished")
  541. return
  542. }
  543. }
  544. }()
  545. commsIncomingPub, commsErrors := startComms(c.conn, c, inboundFromStore, commsoboundP, commsobound)
  546. c.commsStopped = make(chan struct{})
  547. go func() {
  548. for {
  549. if commsIncomingPub == nil && commsErrors == nil {
  550. break
  551. }
  552. select {
  553. case pub, ok := <-commsIncomingPub:
  554. if !ok {
  555. // Incoming comms has shutdown
  556. close(incomingPubChan) // stop the router
  557. commsIncomingPub = nil
  558. continue
  559. }
  560. // Care is needed here because an error elsewhere could trigger a deadlock
  561. sendPubLoop:
  562. for {
  563. select {
  564. case incomingPubChan <- pub:
  565. break sendPubLoop
  566. case err, ok := <-commsErrors:
  567. if !ok { // commsErrors has been closed so we can ignore it
  568. commsErrors = nil
  569. continue
  570. }
  571. ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err)
  572. c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
  573. continue
  574. }
  575. }
  576. case err, ok := <-commsErrors:
  577. if !ok {
  578. commsErrors = nil
  579. continue
  580. }
  581. ERROR.Println(CLI, "Connect comms goroutine - error triggered", err)
  582. c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
  583. continue
  584. }
  585. }
  586. DEBUG.Println(CLI, "incoming comms goroutine done")
  587. close(c.commsStopped)
  588. }()
  589. DEBUG.Println(CLI, "startCommsWorkers done")
  590. return true
  591. }
  592. // stopWorkersAndComms - Cleanly shuts down worker go routines (including the comms routines) and waits until everything has stopped
  593. // Returns nil it workers did not need to be stopped; otherwise returns a channel which will be closed when the stop is complete
  594. // Note: This may block so run as a go routine if calling from any of the comms routines
  595. func (c *client) stopCommsWorkers() chan struct{} {
  596. DEBUG.Println(CLI, "stopCommsWorkers called")
  597. // It is possible that this function will be called multiple times simultaneously due to the way things get shutdown
  598. c.connMu.Lock()
  599. if c.conn == nil {
  600. DEBUG.Println(CLI, "stopCommsWorkers done (not running)")
  601. c.connMu.Unlock()
  602. return nil
  603. }
  604. // It is important that everything is stopped in the correct order to avoid deadlocks. The main issue here is
  605. // the router because it both receives incoming publish messages and also sends outgoing acknowledgements. To
  606. // avoid issues we signal the workers to stop and close the connection (it is probably already closed but
  607. // there is no harm in being sure). We can then wait for the workers to finnish before closing outbound comms
  608. // channels which will allow the comms routines to exit.
  609. // We stop all non-comms related workers first (ping, keepalive, errwatch, resume etc) so they don't get blocked waiting on comms
  610. close(c.stop) // Signal for workers to stop
  611. c.conn.Close() // Possible that this is already closed but no harm in closing again
  612. c.conn = nil // Important that this is the only place that this is set to nil
  613. c.connMu.Unlock() // As the connection is now nil we can unlock the mu (allowing subsequent calls to exit immediately)
  614. doneChan := make(chan struct{})
  615. go func() {
  616. DEBUG.Println(CLI, "stopCommsWorkers waiting for workers")
  617. c.workers.Wait()
  618. // Stopping the workers will allow the comms routines to exit; we wait for these to complete
  619. DEBUG.Println(CLI, "stopCommsWorkers waiting for comms")
  620. <-c.commsStopped // wait for comms routine to stop
  621. DEBUG.Println(CLI, "stopCommsWorkers done")
  622. close(doneChan)
  623. }()
  624. return doneChan
  625. }
  626. // Publish will publish a message with the specified QoS and content
  627. // to the specified topic.
  628. // Returns a token to track delivery of the message to the broker
  629. func (c *client) Publish(topic string, qos byte, retained bool, payload interface{}) Token {
  630. token := newToken(packets.Publish).(*PublishToken)
  631. DEBUG.Println(CLI, "enter Publish")
  632. switch {
  633. case !c.IsConnected():
  634. token.setError(ErrNotConnected)
  635. return token
  636. case c.connectionStatus() == reconnecting && qos == 0:
  637. token.flowComplete()
  638. return token
  639. }
  640. pub := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
  641. pub.Qos = qos
  642. pub.TopicName = topic
  643. pub.Retain = retained
  644. switch p := payload.(type) {
  645. case string:
  646. pub.Payload = []byte(p)
  647. case []byte:
  648. pub.Payload = p
  649. case bytes.Buffer:
  650. pub.Payload = p.Bytes()
  651. default:
  652. token.setError(fmt.Errorf("unknown payload type"))
  653. return token
  654. }
  655. if pub.Qos != 0 && pub.MessageID == 0 {
  656. mID := c.getID(token)
  657. if mID == 0 {
  658. token.setError(fmt.Errorf("no message IDs available"))
  659. return token
  660. }
  661. pub.MessageID = mID
  662. token.messageID = mID
  663. }
  664. persistOutbound(c.persist, pub)
  665. switch c.connectionStatus() {
  666. case connecting:
  667. DEBUG.Println(CLI, "storing publish message (connecting), topic:", topic)
  668. case reconnecting:
  669. DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic)
  670. default:
  671. DEBUG.Println(CLI, "sending publish message, topic:", topic)
  672. publishWaitTimeout := c.options.WriteTimeout
  673. if publishWaitTimeout == 0 {
  674. publishWaitTimeout = time.Second * 30
  675. }
  676. select {
  677. case c.obound <- &PacketAndToken{p: pub, t: token}:
  678. case <-time.After(publishWaitTimeout):
  679. token.setError(errors.New("publish was broken by timeout"))
  680. }
  681. }
  682. return token
  683. }
  684. // Subscribe starts a new subscription. Provide a MessageHandler to be executed when
  685. // a message is published on the topic provided.
  686. //
  687. // If options.OrderMatters is true (the default) then callback must not block or
  688. // call functions within this package that may block (e.g. Publish) other than in
  689. // a new go routine.
  690. // callback must be safe for concurrent use by multiple goroutines.
  691. func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Token {
  692. token := newToken(packets.Subscribe).(*SubscribeToken)
  693. DEBUG.Println(CLI, "enter Subscribe")
  694. if !c.IsConnected() {
  695. token.setError(ErrNotConnected)
  696. return token
  697. }
  698. if !c.IsConnectionOpen() {
  699. switch {
  700. case !c.options.ResumeSubs:
  701. // if not connected and resumesubs not set this sub will be thrown away
  702. token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
  703. return token
  704. case c.options.CleanSession && c.connectionStatus() == reconnecting:
  705. // if reconnecting and cleansession is true this sub will be thrown away
  706. token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
  707. return token
  708. }
  709. }
  710. sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
  711. if err := validateTopicAndQos(topic, qos); err != nil {
  712. token.setError(err)
  713. return token
  714. }
  715. sub.Topics = append(sub.Topics, topic)
  716. sub.Qoss = append(sub.Qoss, qos)
  717. if strings.HasPrefix(topic, "$share/") {
  718. topic = strings.Join(strings.Split(topic, "/")[2:], "/")
  719. }
  720. if strings.HasPrefix(topic, "$queue/") {
  721. topic = strings.TrimPrefix(topic, "$queue/")
  722. }
  723. if callback != nil {
  724. c.msgRouter.addRoute(topic, callback)
  725. }
  726. token.subs = append(token.subs, topic)
  727. if sub.MessageID == 0 {
  728. mID := c.getID(token)
  729. if mID == 0 {
  730. token.setError(fmt.Errorf("no message IDs available"))
  731. return token
  732. }
  733. sub.MessageID = mID
  734. token.messageID = mID
  735. }
  736. DEBUG.Println(CLI, sub.String())
  737. persistOutbound(c.persist, sub)
  738. switch c.connectionStatus() {
  739. case connecting:
  740. DEBUG.Println(CLI, "storing subscribe message (connecting), topic:", topic)
  741. case reconnecting:
  742. DEBUG.Println(CLI, "storing subscribe message (reconnecting), topic:", topic)
  743. default:
  744. DEBUG.Println(CLI, "sending subscribe message, topic:", topic)
  745. subscribeWaitTimeout := c.options.WriteTimeout
  746. if subscribeWaitTimeout == 0 {
  747. subscribeWaitTimeout = time.Second * 30
  748. }
  749. select {
  750. case c.oboundP <- &PacketAndToken{p: sub, t: token}:
  751. case <-time.After(subscribeWaitTimeout):
  752. token.setError(errors.New("subscribe was broken by timeout"))
  753. }
  754. }
  755. DEBUG.Println(CLI, "exit Subscribe")
  756. return token
  757. }
  758. // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
  759. // be executed when a message is published on one of the topics provided.
  760. //
  761. // If options.OrderMatters is true (the default) then callback must not block or
  762. // call functions within this package that may block (e.g. Publish) other than in
  763. // a new go routine.
  764. // callback must be safe for concurrent use by multiple goroutines.
  765. func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token {
  766. var err error
  767. token := newToken(packets.Subscribe).(*SubscribeToken)
  768. DEBUG.Println(CLI, "enter SubscribeMultiple")
  769. if !c.IsConnected() {
  770. token.setError(ErrNotConnected)
  771. return token
  772. }
  773. if !c.IsConnectionOpen() {
  774. switch {
  775. case !c.options.ResumeSubs:
  776. // if not connected and resumesubs not set this sub will be thrown away
  777. token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
  778. return token
  779. case c.options.CleanSession && c.connectionStatus() == reconnecting:
  780. // if reconnecting and cleansession is true this sub will be thrown away
  781. token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
  782. return token
  783. }
  784. }
  785. sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
  786. if sub.Topics, sub.Qoss, err = validateSubscribeMap(filters); err != nil {
  787. token.setError(err)
  788. return token
  789. }
  790. if callback != nil {
  791. for topic := range filters {
  792. c.msgRouter.addRoute(topic, callback)
  793. }
  794. }
  795. token.subs = make([]string, len(sub.Topics))
  796. copy(token.subs, sub.Topics)
  797. if sub.MessageID == 0 {
  798. mID := c.getID(token)
  799. if mID == 0 {
  800. token.setError(fmt.Errorf("no message IDs available"))
  801. return token
  802. }
  803. sub.MessageID = mID
  804. token.messageID = mID
  805. }
  806. persistOutbound(c.persist, sub)
  807. switch c.connectionStatus() {
  808. case connecting:
  809. DEBUG.Println(CLI, "storing subscribe message (connecting), topics:", sub.Topics)
  810. case reconnecting:
  811. DEBUG.Println(CLI, "storing subscribe message (reconnecting), topics:", sub.Topics)
  812. default:
  813. DEBUG.Println(CLI, "sending subscribe message, topics:", sub.Topics)
  814. subscribeWaitTimeout := c.options.WriteTimeout
  815. if subscribeWaitTimeout == 0 {
  816. subscribeWaitTimeout = time.Second * 30
  817. }
  818. select {
  819. case c.oboundP <- &PacketAndToken{p: sub, t: token}:
  820. case <-time.After(subscribeWaitTimeout):
  821. token.setError(errors.New("subscribe was broken by timeout"))
  822. }
  823. }
  824. DEBUG.Println(CLI, "exit SubscribeMultiple")
  825. return token
  826. }
  827. // reserveStoredPublishIDs reserves the ids for publish packets in the persistent store to ensure these are not duplicated
  828. func (c *client) reserveStoredPublishIDs() {
  829. // The resume function sets the stored id for publish packets only (some other packets
  830. // will get new ids in net code). This means that the only keys we need to ensure are
  831. // unique are the publish ones (and these will completed/replaced in resume() )
  832. if !c.options.CleanSession {
  833. storedKeys := c.persist.All()
  834. for _, key := range storedKeys {
  835. packet := c.persist.Get(key)
  836. if packet == nil {
  837. continue
  838. }
  839. switch packet.(type) {
  840. case *packets.PublishPacket:
  841. details := packet.Details()
  842. token := &PlaceHolderToken{id: details.MessageID}
  843. c.claimID(token, details.MessageID)
  844. }
  845. }
  846. }
  847. }
  848. // Load all stored messages and resend them
  849. // Call this to ensure QOS > 1,2 even after an application crash
  850. // Note: This function will exit if c.stop is closed (this allows the shutdown to proceed avoiding a potential deadlock)
  851. //
  852. func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
  853. DEBUG.Println(STR, "enter Resume")
  854. storedKeys := c.persist.All()
  855. for _, key := range storedKeys {
  856. packet := c.persist.Get(key)
  857. if packet == nil {
  858. DEBUG.Println(STR, fmt.Sprintf("resume found NIL packet (%s)", key))
  859. continue
  860. }
  861. details := packet.Details()
  862. if isKeyOutbound(key) {
  863. switch p := packet.(type) {
  864. case *packets.SubscribePacket:
  865. if subscription {
  866. DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID))
  867. subPacket := packet.(*packets.SubscribePacket)
  868. token := newToken(packets.Subscribe).(*SubscribeToken)
  869. token.messageID = details.MessageID
  870. token.subs = append(token.subs, subPacket.Topics...)
  871. c.claimID(token, details.MessageID)
  872. select {
  873. case c.oboundP <- &PacketAndToken{p: packet, t: token}:
  874. case <-c.stop:
  875. DEBUG.Println(STR, "resume exiting due to stop")
  876. return
  877. }
  878. } else {
  879. c.persist.Del(key) // Unsubscribe packets should not be retained following a reconnect
  880. }
  881. case *packets.UnsubscribePacket:
  882. if subscription {
  883. DEBUG.Println(STR, fmt.Sprintf("loaded pending unsubscribe (%d)", details.MessageID))
  884. token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
  885. select {
  886. case c.oboundP <- &PacketAndToken{p: packet, t: token}:
  887. case <-c.stop:
  888. DEBUG.Println(STR, "resume exiting due to stop")
  889. return
  890. }
  891. } else {
  892. c.persist.Del(key) // Unsubscribe packets should not be retained following a reconnect
  893. }
  894. case *packets.PubrelPacket:
  895. DEBUG.Println(STR, fmt.Sprintf("loaded pending pubrel (%d)", details.MessageID))
  896. select {
  897. case c.oboundP <- &PacketAndToken{p: packet, t: nil}:
  898. case <-c.stop:
  899. DEBUG.Println(STR, "resume exiting due to stop")
  900. return
  901. }
  902. case *packets.PublishPacket:
  903. // spec: If the DUP flag is set to 0, it indicates that this is the first occasion that the Client or
  904. // Server has attempted to send this MQTT PUBLISH Packet. If the DUP flag is set to 1, it indicates that
  905. // this might be re-delivery of an earlier attempt to send the Packet.
  906. //
  907. // If the message is in the store than an attempt at delivery has been made (note that the message may
  908. // never have made it onto the wire but tracking that would be complicated!).
  909. if p.Qos != 0 { // spec: The DUP flag MUST be set to 0 for all QoS 0 messages
  910. p.Dup = true
  911. }
  912. token := newToken(packets.Publish).(*PublishToken)
  913. token.messageID = details.MessageID
  914. c.claimID(token, details.MessageID)
  915. DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID))
  916. DEBUG.Println(STR, details)
  917. select {
  918. case c.obound <- &PacketAndToken{p: p, t: token}:
  919. case <-c.stop:
  920. DEBUG.Println(STR, "resume exiting due to stop")
  921. return
  922. }
  923. default:
  924. ERROR.Println(STR, "invalid message type in store (discarded)")
  925. c.persist.Del(key)
  926. }
  927. } else {
  928. switch packet.(type) {
  929. case *packets.PubrelPacket:
  930. DEBUG.Println(STR, fmt.Sprintf("loaded pending incomming (%d)", details.MessageID))
  931. select {
  932. case ibound <- packet:
  933. case <-c.stop:
  934. DEBUG.Println(STR, "resume exiting due to stop (ibound <- packet)")
  935. return
  936. }
  937. default:
  938. ERROR.Println(STR, "invalid message type in store (discarded)")
  939. c.persist.Del(key)
  940. }
  941. }
  942. }
  943. DEBUG.Println(STR, "exit resume")
  944. }
  945. // Unsubscribe will end the subscription from each of the topics provided.
  946. // Messages published to those topics from other clients will no longer be
  947. // received.
  948. func (c *client) Unsubscribe(topics ...string) Token {
  949. token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
  950. DEBUG.Println(CLI, "enter Unsubscribe")
  951. if !c.IsConnected() {
  952. token.setError(ErrNotConnected)
  953. return token
  954. }
  955. if !c.IsConnectionOpen() {
  956. switch {
  957. case !c.options.ResumeSubs:
  958. // if not connected and resumesubs not set this unsub will be thrown away
  959. token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
  960. return token
  961. case c.options.CleanSession && c.connectionStatus() == reconnecting:
  962. // if reconnecting and cleansession is true this unsub will be thrown away
  963. token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
  964. return token
  965. }
  966. }
  967. unsub := packets.NewControlPacket(packets.Unsubscribe).(*packets.UnsubscribePacket)
  968. unsub.Topics = make([]string, len(topics))
  969. copy(unsub.Topics, topics)
  970. if unsub.MessageID == 0 {
  971. mID := c.getID(token)
  972. if mID == 0 {
  973. token.setError(fmt.Errorf("no message IDs available"))
  974. return token
  975. }
  976. unsub.MessageID = mID
  977. token.messageID = mID
  978. }
  979. persistOutbound(c.persist, unsub)
  980. switch c.connectionStatus() {
  981. case connecting:
  982. DEBUG.Println(CLI, "storing unsubscribe message (connecting), topics:", topics)
  983. case reconnecting:
  984. DEBUG.Println(CLI, "storing unsubscribe message (reconnecting), topics:", topics)
  985. default:
  986. DEBUG.Println(CLI, "sending unsubscribe message, topics:", topics)
  987. subscribeWaitTimeout := c.options.WriteTimeout
  988. if subscribeWaitTimeout == 0 {
  989. subscribeWaitTimeout = time.Second * 30
  990. }
  991. select {
  992. case c.oboundP <- &PacketAndToken{p: unsub, t: token}:
  993. for _, topic := range topics {
  994. c.msgRouter.deleteRoute(topic)
  995. }
  996. case <-time.After(subscribeWaitTimeout):
  997. token.setError(errors.New("unsubscribe was broken by timeout"))
  998. }
  999. }
  1000. DEBUG.Println(CLI, "exit Unsubscribe")
  1001. return token
  1002. }
  1003. // OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
  1004. // in use by the client.
  1005. func (c *client) OptionsReader() ClientOptionsReader {
  1006. r := ClientOptionsReader{options: &c.options}
  1007. return r
  1008. }
  1009. // DefaultConnectionLostHandler is a definition of a function that simply
  1010. // reports to the DEBUG log the reason for the client losing a connection.
  1011. func DefaultConnectionLostHandler(client Client, reason error) {
  1012. DEBUG.Println("Connection lost:", reason.Error())
  1013. }
  1014. // UpdateLastReceived - Will be called whenever a packet is received off the network
  1015. // This is used by the keepalive routine to
  1016. func (c *client) UpdateLastReceived() {
  1017. if c.options.KeepAlive != 0 {
  1018. c.lastReceived.Store(time.Now())
  1019. }
  1020. }
  1021. // UpdateLastReceived - Will be called whenever a packet is successfully transmitted to the network
  1022. func (c *client) UpdateLastSent() {
  1023. if c.options.KeepAlive != 0 {
  1024. c.lastSent.Store(time.Now())
  1025. }
  1026. }
  1027. // getWriteTimeOut returns the writetimeout (duration to wait when writing to the connection) or 0 if none
  1028. func (c *client) getWriteTimeOut() time.Duration {
  1029. return c.options.WriteTimeout
  1030. }
  1031. // persistOutbound adds the packet to the outbound store
  1032. func (c *client) persistOutbound(m packets.ControlPacket) {
  1033. persistOutbound(c.persist, m)
  1034. }
  1035. // persistInbound adds the packet to the inbound store
  1036. func (c *client) persistInbound(m packets.ControlPacket) {
  1037. persistInbound(c.persist, m)
  1038. }
  1039. // pingRespReceived will be called by the network routines when a ping response is received
  1040. func (c *client) pingRespReceived() {
  1041. atomic.StoreInt32(&c.pingOutstanding, 0)
  1042. }