messageids.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. */
  14. package mqtt
  15. import (
  16. "fmt"
  17. "sync"
  18. "time"
  19. )
  20. // MId is 16 bit message id as specified by the MQTT spec.
  21. // In general, these values should not be depended upon by
  22. // the client application.
  23. type MId uint16
  24. type messageIds struct {
  25. sync.RWMutex
  26. index map[uint16]tokenCompletor
  27. lastIssuedID uint16 // The most recently issued ID. Used so we cycle through ids rather than immediately reusing them (can make debugging easier)
  28. }
  29. const (
  30. midMin uint16 = 1
  31. midMax uint16 = 65535
  32. )
  33. func (mids *messageIds) cleanUp() {
  34. mids.Lock()
  35. for _, token := range mids.index {
  36. switch token.(type) {
  37. case *PublishToken:
  38. token.setError(fmt.Errorf("connection lost before Publish completed"))
  39. case *SubscribeToken:
  40. token.setError(fmt.Errorf("connection lost before Subscribe completed"))
  41. case *UnsubscribeToken:
  42. token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
  43. case nil:
  44. continue
  45. }
  46. token.flowComplete()
  47. }
  48. mids.index = make(map[uint16]tokenCompletor)
  49. mids.Unlock()
  50. DEBUG.Println(MID, "cleaned up")
  51. }
  52. func (mids *messageIds) freeID(id uint16) {
  53. mids.Lock()
  54. delete(mids.index, id)
  55. mids.Unlock()
  56. }
  57. func (mids *messageIds) claimID(token tokenCompletor, id uint16) {
  58. mids.Lock()
  59. defer mids.Unlock()
  60. if _, ok := mids.index[id]; !ok {
  61. mids.index[id] = token
  62. } else {
  63. old := mids.index[id]
  64. old.flowComplete()
  65. mids.index[id] = token
  66. }
  67. if id > mids.lastIssuedID {
  68. mids.lastIssuedID = id
  69. }
  70. }
  71. // getID will return an available id or 0 if none available
  72. // The id will generally be the previous id + 1 (because this makes tracing messages a bit simpler)
  73. func (mids *messageIds) getID(t tokenCompletor) uint16 {
  74. mids.Lock()
  75. defer mids.Unlock()
  76. i := mids.lastIssuedID // note: the only situation where lastIssuedID is 0 the map will be empty
  77. looped := false // uint16 will loop from 65535->0
  78. for {
  79. i++
  80. if i == 0 { // skip 0 because its not a valid id (Control Packets MUST contain a non-zero 16-bit Packet Identifier [MQTT-2.3.1-1])
  81. i++
  82. looped = true
  83. }
  84. if _, ok := mids.index[i]; !ok {
  85. mids.index[i] = t
  86. mids.lastIssuedID = i
  87. return i
  88. }
  89. if (looped && i == mids.lastIssuedID) || (mids.lastIssuedID == 0 && i == midMax) { // lastIssuedID will be 0 at startup
  90. return 0 // no free ids
  91. }
  92. }
  93. }
  94. func (mids *messageIds) getToken(id uint16) tokenCompletor {
  95. mids.RLock()
  96. defer mids.RUnlock()
  97. if token, ok := mids.index[id]; ok {
  98. return token
  99. }
  100. return &DummyToken{id: id}
  101. }
  102. type DummyToken struct {
  103. id uint16
  104. }
  105. // Wait implements the Token Wait method.
  106. func (d *DummyToken) Wait() bool {
  107. return true
  108. }
  109. // WaitTimeout implements the Token WaitTimeout method.
  110. func (d *DummyToken) WaitTimeout(t time.Duration) bool {
  111. return true
  112. }
  113. // Done implements the Token Done method.
  114. func (d *DummyToken) Done() <-chan struct{} {
  115. ch := make(chan struct{})
  116. close(ch)
  117. return ch
  118. }
  119. func (d *DummyToken) flowComplete() {
  120. ERROR.Printf("A lookup for token %d returned nil\n", d.id)
  121. }
  122. func (d *DummyToken) Error() error {
  123. return nil
  124. }
  125. func (d *DummyToken) setError(e error) {}
  126. // PlaceHolderToken does nothing and was implemented to allow a messageid to be reserved
  127. // it differs from DummyToken in that calling flowComplete does not generate an error (it
  128. // is expected that flowComplete will be called when the token is overwritten with a real token)
  129. type PlaceHolderToken struct {
  130. id uint16
  131. }
  132. // Wait implements the Token Wait method.
  133. func (p *PlaceHolderToken) Wait() bool {
  134. return true
  135. }
  136. // WaitTimeout implements the Token WaitTimeout method.
  137. func (p *PlaceHolderToken) WaitTimeout(t time.Duration) bool {
  138. return true
  139. }
  140. // Done implements the Token Done method.
  141. func (p *PlaceHolderToken) Done() <-chan struct{} {
  142. ch := make(chan struct{})
  143. close(ch)
  144. return ch
  145. }
  146. func (p *PlaceHolderToken) flowComplete() {
  147. }
  148. func (p *PlaceHolderToken) Error() error {
  149. return nil
  150. }
  151. func (p *PlaceHolderToken) setError(e error) {}