token.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. /*
  2. * Copyright (c) 2014 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Allan Stockdill-Mander
  11. */
  12. package mqtt
  13. import (
  14. "sync"
  15. "time"
  16. "github.com/eclipse/paho.mqtt.golang/packets"
  17. )
  18. // PacketAndToken is a struct that contains both a ControlPacket and a
  19. // Token. This struct is passed via channels between the client interface
  20. // code and the underlying code responsible for sending and receiving
  21. // MQTT messages.
  22. type PacketAndToken struct {
  23. p packets.ControlPacket
  24. t tokenCompletor
  25. }
  26. // Token defines the interface for the tokens used to indicate when
  27. // actions have completed.
  28. type Token interface {
  29. // Wait will wait indefinitely for the Token to complete, ie the Publish
  30. // to be sent and confirmed receipt from the broker.
  31. Wait() bool
  32. // WaitTimeout takes a time.Duration to wait for the flow associated with the
  33. // Token to complete, returns true if it returned before the timeout or
  34. // returns false if the timeout occurred. In the case of a timeout the Token
  35. // does not have an error set in case the caller wishes to wait again.
  36. WaitTimeout(time.Duration) bool
  37. // Done returns a channel that is closed when the flow associated
  38. // with the Token completes. Clients should call Error after the
  39. // channel is closed to check if the flow completed successfully.
  40. //
  41. // Done is provided for use in select statements. Simple use cases may
  42. // use Wait or WaitTimeout.
  43. Done() <-chan struct{}
  44. Error() error
  45. }
  46. type TokenErrorSetter interface {
  47. setError(error)
  48. }
  49. type tokenCompletor interface {
  50. Token
  51. TokenErrorSetter
  52. flowComplete()
  53. }
  54. type baseToken struct {
  55. m sync.RWMutex
  56. complete chan struct{}
  57. err error
  58. }
  59. // Wait implements the Token Wait method.
  60. func (b *baseToken) Wait() bool {
  61. <-b.complete
  62. return true
  63. }
  64. // WaitTimeout implements the Token WaitTimeout method.
  65. func (b *baseToken) WaitTimeout(d time.Duration) bool {
  66. timer := time.NewTimer(d)
  67. select {
  68. case <-b.complete:
  69. if !timer.Stop() {
  70. <-timer.C
  71. }
  72. return true
  73. case <-timer.C:
  74. }
  75. return false
  76. }
  77. // Done implements the Token Done method.
  78. func (b *baseToken) Done() <-chan struct{} {
  79. return b.complete
  80. }
  81. func (b *baseToken) flowComplete() {
  82. select {
  83. case <-b.complete:
  84. default:
  85. close(b.complete)
  86. }
  87. }
  88. func (b *baseToken) Error() error {
  89. b.m.RLock()
  90. defer b.m.RUnlock()
  91. return b.err
  92. }
  93. func (b *baseToken) setError(e error) {
  94. b.m.Lock()
  95. b.err = e
  96. b.flowComplete()
  97. b.m.Unlock()
  98. }
  99. func newToken(tType byte) tokenCompletor {
  100. switch tType {
  101. case packets.Connect:
  102. return &ConnectToken{baseToken: baseToken{complete: make(chan struct{})}}
  103. case packets.Subscribe:
  104. return &SubscribeToken{baseToken: baseToken{complete: make(chan struct{})}, subResult: make(map[string]byte)}
  105. case packets.Publish:
  106. return &PublishToken{baseToken: baseToken{complete: make(chan struct{})}}
  107. case packets.Unsubscribe:
  108. return &UnsubscribeToken{baseToken: baseToken{complete: make(chan struct{})}}
  109. case packets.Disconnect:
  110. return &DisconnectToken{baseToken: baseToken{complete: make(chan struct{})}}
  111. }
  112. return nil
  113. }
  114. // ConnectToken is an extension of Token containing the extra fields
  115. // required to provide information about calls to Connect()
  116. type ConnectToken struct {
  117. baseToken
  118. returnCode byte
  119. sessionPresent bool
  120. }
  121. // ReturnCode returns the acknowledgement code in the connack sent
  122. // in response to a Connect()
  123. func (c *ConnectToken) ReturnCode() byte {
  124. c.m.RLock()
  125. defer c.m.RUnlock()
  126. return c.returnCode
  127. }
  128. // SessionPresent returns a bool representing the value of the
  129. // session present field in the connack sent in response to a Connect()
  130. func (c *ConnectToken) SessionPresent() bool {
  131. c.m.RLock()
  132. defer c.m.RUnlock()
  133. return c.sessionPresent
  134. }
  135. // PublishToken is an extension of Token containing the extra fields
  136. // required to provide information about calls to Publish()
  137. type PublishToken struct {
  138. baseToken
  139. messageID uint16
  140. }
  141. // MessageID returns the MQTT message ID that was assigned to the
  142. // Publish packet when it was sent to the broker
  143. func (p *PublishToken) MessageID() uint16 {
  144. return p.messageID
  145. }
  146. // SubscribeToken is an extension of Token containing the extra fields
  147. // required to provide information about calls to Subscribe()
  148. type SubscribeToken struct {
  149. baseToken
  150. subs []string
  151. subResult map[string]byte
  152. messageID uint16
  153. }
  154. // Result returns a map of topics that were subscribed to along with
  155. // the matching return code from the broker. This is either the Qos
  156. // value of the subscription or an error code.
  157. func (s *SubscribeToken) Result() map[string]byte {
  158. s.m.RLock()
  159. defer s.m.RUnlock()
  160. return s.subResult
  161. }
  162. // UnsubscribeToken is an extension of Token containing the extra fields
  163. // required to provide information about calls to Unsubscribe()
  164. type UnsubscribeToken struct {
  165. baseToken
  166. messageID uint16
  167. }
  168. // DisconnectToken is an extension of Token containing the extra fields
  169. // required to provide information about calls to Disconnect()
  170. type DisconnectToken struct {
  171. baseToken
  172. }