options.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. * Måns Ansgariusson
  14. */
  15. // Portions copyright © 2018 TIBCO Software Inc.
  16. package mqtt
  17. import (
  18. "crypto/tls"
  19. "net/http"
  20. "net/url"
  21. "strings"
  22. "time"
  23. )
  24. // CredentialsProvider allows the username and password to be updated
  25. // before reconnecting. It should return the current username and password.
  26. type CredentialsProvider func() (username string, password string)
  27. // MessageHandler is a callback type which can be set to be
  28. // executed upon the arrival of messages published to topics
  29. // to which the client is subscribed.
  30. type MessageHandler func(Client, Message)
  31. // ConnectionLostHandler is a callback type which can be set to be
  32. // executed upon an unintended disconnection from the MQTT broker.
  33. // Disconnects caused by calling Disconnect or ForceDisconnect will
  34. // not cause an OnConnectionLost callback to execute.
  35. type ConnectionLostHandler func(Client, error)
  36. // OnConnectHandler is a callback that is called when the client
  37. // state changes from unconnected/disconnected to connected. Both
  38. // at initial connection and on reconnection
  39. type OnConnectHandler func(Client)
  40. // ReconnectHandler is invoked prior to reconnecting after
  41. // the initial connection is lost
  42. type ReconnectHandler func(Client, *ClientOptions)
  43. // ConnectionAttemptHandler is invoked prior to making the initial connection.
  44. type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config
  45. // ClientOptions contains configurable options for an Client. Note that these should be set using the
  46. // relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage.
  47. type ClientOptions struct {
  48. Servers []*url.URL
  49. ClientID string
  50. Username string
  51. Password string
  52. CredentialsProvider CredentialsProvider
  53. CleanSession bool
  54. Order bool
  55. WillEnabled bool
  56. WillTopic string
  57. WillPayload []byte
  58. WillQos byte
  59. WillRetained bool
  60. ProtocolVersion uint
  61. protocolVersionExplicit bool
  62. TLSConfig *tls.Config
  63. KeepAlive int64
  64. PingTimeout time.Duration
  65. ConnectTimeout time.Duration
  66. MaxReconnectInterval time.Duration
  67. AutoReconnect bool
  68. ConnectRetryInterval time.Duration
  69. ConnectRetry bool
  70. Store Store
  71. DefaultPublishHandler MessageHandler
  72. OnConnect OnConnectHandler
  73. OnConnectionLost ConnectionLostHandler
  74. OnReconnecting ReconnectHandler
  75. OnConnectAttempt ConnectionAttemptHandler
  76. WriteTimeout time.Duration
  77. MessageChannelDepth uint
  78. ResumeSubs bool
  79. HTTPHeaders http.Header
  80. WebsocketOptions *WebsocketOptions
  81. }
  82. // NewClientOptions will create a new ClientClientOptions type with some
  83. // default values.
  84. // Port: 1883
  85. // CleanSession: True
  86. // Order: True (note: it is recommended that this be set to FALSE unless order is important)
  87. // KeepAlive: 30 (seconds)
  88. // ConnectTimeout: 30 (seconds)
  89. // MaxReconnectInterval 10 (minutes)
  90. // AutoReconnect: True
  91. func NewClientOptions() *ClientOptions {
  92. o := &ClientOptions{
  93. Servers: nil,
  94. ClientID: "",
  95. Username: "",
  96. Password: "",
  97. CleanSession: true,
  98. Order: true,
  99. WillEnabled: false,
  100. WillTopic: "",
  101. WillPayload: nil,
  102. WillQos: 0,
  103. WillRetained: false,
  104. ProtocolVersion: 0,
  105. protocolVersionExplicit: false,
  106. KeepAlive: 30,
  107. PingTimeout: 10 * time.Second,
  108. ConnectTimeout: 30 * time.Second,
  109. MaxReconnectInterval: 10 * time.Minute,
  110. AutoReconnect: true,
  111. ConnectRetryInterval: 30 * time.Second,
  112. ConnectRetry: false,
  113. Store: nil,
  114. OnConnect: nil,
  115. OnConnectionLost: DefaultConnectionLostHandler,
  116. OnConnectAttempt: nil,
  117. WriteTimeout: 0, // 0 represents timeout disabled
  118. ResumeSubs: false,
  119. HTTPHeaders: make(map[string][]string),
  120. WebsocketOptions: &WebsocketOptions{},
  121. }
  122. return o
  123. }
  124. // AddBroker adds a broker URI to the list of brokers to be used. The format should be
  125. // scheme://host:port
  126. // Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname)
  127. // and "port" is the port on which the broker is accepting connections.
  128. //
  129. // Default values for hostname is "127.0.0.1", for schema is "tcp://".
  130. //
  131. // An example broker URI would look like: tcp://foobar.com:1883
  132. func (o *ClientOptions) AddBroker(server string) *ClientOptions {
  133. if len(server) > 0 && server[0] == ':' {
  134. server = "127.0.0.1" + server
  135. }
  136. if !strings.Contains(server, "://") {
  137. server = "tcp://" + server
  138. }
  139. brokerURI, err := url.Parse(server)
  140. if err != nil {
  141. ERROR.Println(CLI, "Failed to parse %q broker address: %s", server, err)
  142. return o
  143. }
  144. o.Servers = append(o.Servers, brokerURI)
  145. return o
  146. }
  147. // SetResumeSubs will enable resuming of stored (un)subscribe messages when connecting
  148. // but not reconnecting if CleanSession is false. Otherwise these messages are discarded.
  149. func (o *ClientOptions) SetResumeSubs(resume bool) *ClientOptions {
  150. o.ResumeSubs = resume
  151. return o
  152. }
  153. // SetClientID will set the client id to be used by this client when
  154. // connecting to the MQTT broker. According to the MQTT v3.1 specification,
  155. // a client id must be no longer than 23 characters.
  156. func (o *ClientOptions) SetClientID(id string) *ClientOptions {
  157. o.ClientID = id
  158. return o
  159. }
  160. // SetUsername will set the username to be used by this client when connecting
  161. // to the MQTT broker. Note: without the use of SSL/TLS, this information will
  162. // be sent in plaintext across the wire.
  163. func (o *ClientOptions) SetUsername(u string) *ClientOptions {
  164. o.Username = u
  165. return o
  166. }
  167. // SetPassword will set the password to be used by this client when connecting
  168. // to the MQTT broker. Note: without the use of SSL/TLS, this information will
  169. // be sent in plaintext across the wire.
  170. func (o *ClientOptions) SetPassword(p string) *ClientOptions {
  171. o.Password = p
  172. return o
  173. }
  174. // SetCredentialsProvider will set a method to be called by this client when
  175. // connecting to the MQTT broker that provide the current username and password.
  176. // Note: without the use of SSL/TLS, this information will be sent
  177. // in plaintext across the wire.
  178. func (o *ClientOptions) SetCredentialsProvider(p CredentialsProvider) *ClientOptions {
  179. o.CredentialsProvider = p
  180. return o
  181. }
  182. // SetCleanSession will set the "clean session" flag in the connect message
  183. // when this client connects to an MQTT broker. By setting this flag, you are
  184. // indicating that no messages saved by the broker for this client should be
  185. // delivered. Any messages that were going to be sent by this client before
  186. // disconnecting previously but didn't will not be sent upon connecting to the
  187. // broker.
  188. func (o *ClientOptions) SetCleanSession(clean bool) *ClientOptions {
  189. o.CleanSession = clean
  190. return o
  191. }
  192. // SetOrderMatters will set the message routing to guarantee order within
  193. // each QoS level. By default, this value is true. If set to false (recommended),
  194. // this flag indicates that messages can be delivered asynchronously
  195. // from the client to the application and possibly arrive out of order.
  196. // Specifically, the message handler is called in its own go routine.
  197. // Note that setting this to true does not guarantee in-order delivery
  198. // (this is subject to broker settings like "max_inflight_messages=1" in mosquitto)
  199. // and if true then handlers must not block.
  200. func (o *ClientOptions) SetOrderMatters(order bool) *ClientOptions {
  201. o.Order = order
  202. return o
  203. }
  204. // SetTLSConfig will set an SSL/TLS configuration to be used when connecting
  205. // to an MQTT broker. Please read the official Go documentation for more
  206. // information.
  207. func (o *ClientOptions) SetTLSConfig(t *tls.Config) *ClientOptions {
  208. o.TLSConfig = t
  209. return o
  210. }
  211. // SetStore will set the implementation of the Store interface
  212. // used to provide message persistence in cases where QoS levels
  213. // QoS_ONE or QoS_TWO are used. If no store is provided, then the
  214. // client will use MemoryStore by default.
  215. func (o *ClientOptions) SetStore(s Store) *ClientOptions {
  216. o.Store = s
  217. return o
  218. }
  219. // SetKeepAlive will set the amount of time (in seconds) that the client
  220. // should wait before sending a PING request to the broker. This will
  221. // allow the client to know that a connection has not been lost with the
  222. // server.
  223. func (o *ClientOptions) SetKeepAlive(k time.Duration) *ClientOptions {
  224. o.KeepAlive = int64(k / time.Second)
  225. return o
  226. }
  227. // SetPingTimeout will set the amount of time (in seconds) that the client
  228. // will wait after sending a PING request to the broker, before deciding
  229. // that the connection has been lost. Default is 10 seconds.
  230. func (o *ClientOptions) SetPingTimeout(k time.Duration) *ClientOptions {
  231. o.PingTimeout = k
  232. return o
  233. }
  234. // SetProtocolVersion sets the MQTT version to be used to connect to the
  235. // broker. Legitimate values are currently 3 - MQTT 3.1 or 4 - MQTT 3.1.1
  236. func (o *ClientOptions) SetProtocolVersion(pv uint) *ClientOptions {
  237. if (pv >= 3 && pv <= 4) || (pv > 0x80) {
  238. o.ProtocolVersion = pv
  239. o.protocolVersionExplicit = true
  240. }
  241. return o
  242. }
  243. // UnsetWill will cause any set will message to be disregarded.
  244. func (o *ClientOptions) UnsetWill() *ClientOptions {
  245. o.WillEnabled = false
  246. return o
  247. }
  248. // SetWill accepts a string will message to be set. When the client connects,
  249. // it will give this will message to the broker, which will then publish the
  250. // provided payload (the will) to any clients that are subscribed to the provided
  251. // topic.
  252. func (o *ClientOptions) SetWill(topic string, payload string, qos byte, retained bool) *ClientOptions {
  253. o.SetBinaryWill(topic, []byte(payload), qos, retained)
  254. return o
  255. }
  256. // SetBinaryWill accepts a []byte will message to be set. When the client connects,
  257. // it will give this will message to the broker, which will then publish the
  258. // provided payload (the will) to any clients that are subscribed to the provided
  259. // topic.
  260. func (o *ClientOptions) SetBinaryWill(topic string, payload []byte, qos byte, retained bool) *ClientOptions {
  261. o.WillEnabled = true
  262. o.WillTopic = topic
  263. o.WillPayload = payload
  264. o.WillQos = qos
  265. o.WillRetained = retained
  266. return o
  267. }
  268. // SetDefaultPublishHandler sets the MessageHandler that will be called when a message
  269. // is received that does not match any known subscriptions.
  270. //
  271. // If OrderMatters is true (the defaultHandler) then callback must not block or
  272. // call functions within this package that may block (e.g. Publish) other than in
  273. // a new go routine.
  274. // defaultHandler must be safe for concurrent use by multiple goroutines.
  275. func (o *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions {
  276. o.DefaultPublishHandler = defaultHandler
  277. return o
  278. }
  279. // SetOnConnectHandler sets the function to be called when the client is connected. Both
  280. // at initial connection time and upon automatic reconnect.
  281. func (o *ClientOptions) SetOnConnectHandler(onConn OnConnectHandler) *ClientOptions {
  282. o.OnConnect = onConn
  283. return o
  284. }
  285. // SetConnectionLostHandler will set the OnConnectionLost callback to be executed
  286. // in the case where the client unexpectedly loses connection with the MQTT broker.
  287. func (o *ClientOptions) SetConnectionLostHandler(onLost ConnectionLostHandler) *ClientOptions {
  288. o.OnConnectionLost = onLost
  289. return o
  290. }
  291. // SetReconnectingHandler sets the OnReconnecting callback to be executed prior
  292. // to the client attempting a reconnect to the MQTT broker.
  293. func (o *ClientOptions) SetReconnectingHandler(cb ReconnectHandler) *ClientOptions {
  294. o.OnReconnecting = cb
  295. return o
  296. }
  297. // SetConnectionAttemptHandler sets the ConnectionAttemptHandler callback to be executed prior
  298. // to each attempt to connect to an MQTT broker. Returns the *tls.Config that will be used when establishing
  299. // the connection (a copy of the tls.Config from ClientOptions will be passed in along with the broker URL).
  300. // This allows connection specific changes to be made to the *tls.Config.
  301. func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionAttemptHandler) *ClientOptions {
  302. o.OnConnectAttempt = onConnectAttempt
  303. return o
  304. }
  305. // SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a
  306. // timeout error. A duration of 0 never times out. Default never times out
  307. func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions {
  308. o.WriteTimeout = t
  309. return o
  310. }
  311. // SetConnectTimeout limits how long the client will wait when trying to open a connection
  312. // to an MQTT server before timing out. A duration of 0 never times out.
  313. // Default 30 seconds. Currently only operational on TCP/TLS connections.
  314. func (o *ClientOptions) SetConnectTimeout(t time.Duration) *ClientOptions {
  315. o.ConnectTimeout = t
  316. return o
  317. }
  318. // SetMaxReconnectInterval sets the maximum time that will be waited between reconnection attempts
  319. // when connection is lost
  320. func (o *ClientOptions) SetMaxReconnectInterval(t time.Duration) *ClientOptions {
  321. o.MaxReconnectInterval = t
  322. return o
  323. }
  324. // SetAutoReconnect sets whether the automatic reconnection logic should be used
  325. // when the connection is lost, even if disabled the ConnectionLostHandler is still
  326. // called
  327. func (o *ClientOptions) SetAutoReconnect(a bool) *ClientOptions {
  328. o.AutoReconnect = a
  329. return o
  330. }
  331. // SetConnectRetryInterval sets the time that will be waited between connection attempts
  332. // when initially connecting if ConnectRetry is TRUE
  333. func (o *ClientOptions) SetConnectRetryInterval(t time.Duration) *ClientOptions {
  334. o.ConnectRetryInterval = t
  335. return o
  336. }
  337. // SetConnectRetry sets whether the connect function will automatically retry the connection
  338. // in the event of a failure (when true the token returned by the Connect function will
  339. // not complete until the connection is up or it is cancelled)
  340. // If ConnectRetry is true then subscriptions should be requested in OnConnect handler
  341. // Setting this to TRUE permits messages to be published before the connection is established
  342. func (o *ClientOptions) SetConnectRetry(a bool) *ClientOptions {
  343. o.ConnectRetry = a
  344. return o
  345. }
  346. // SetMessageChannelDepth DEPRECATED The value set here no longer has any effect, this function
  347. // remains so the API is not altered.
  348. func (o *ClientOptions) SetMessageChannelDepth(s uint) *ClientOptions {
  349. o.MessageChannelDepth = s
  350. return o
  351. }
  352. // SetHTTPHeaders sets the additional HTTP headers that will be sent in the WebSocket
  353. // opening handshake.
  354. func (o *ClientOptions) SetHTTPHeaders(h http.Header) *ClientOptions {
  355. o.HTTPHeaders = h
  356. return o
  357. }
  358. // SetWebsocketOptions sets the additional websocket options used in a WebSocket connection
  359. func (o *ClientOptions) SetWebsocketOptions(w *WebsocketOptions) *ClientOptions {
  360. o.WebsocketOptions = w
  361. return o
  362. }