router.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. */
  14. package mqtt
  15. import (
  16. "container/list"
  17. "strings"
  18. "sync"
  19. "github.com/eclipse/paho.mqtt.golang/packets"
  20. )
  21. // route is a type which associates MQTT Topic strings with a
  22. // callback to be executed upon the arrival of a message associated
  23. // with a subscription to that topic.
  24. type route struct {
  25. topic string
  26. callback MessageHandler
  27. }
  28. // match takes a slice of strings which represent the route being tested having been split on '/'
  29. // separators, and a slice of strings representing the topic string in the published message, similarly
  30. // split.
  31. // The function determines if the topic string matches the route according to the MQTT topic rules
  32. // and returns a boolean of the outcome
  33. func match(route []string, topic []string) bool {
  34. if len(route) == 0 {
  35. return len(topic) == 0
  36. }
  37. if len(topic) == 0 {
  38. return route[0] == "#"
  39. }
  40. if route[0] == "#" {
  41. return true
  42. }
  43. if (route[0] == "+") || (route[0] == topic[0]) {
  44. return match(route[1:], topic[1:])
  45. }
  46. return false
  47. }
  48. func routeIncludesTopic(route, topic string) bool {
  49. return match(routeSplit(route), strings.Split(topic, "/"))
  50. }
  51. // removes $share and sharename when splitting the route to allow
  52. // shared subscription routes to correctly match the topic
  53. func routeSplit(route string) []string {
  54. var result []string
  55. if strings.HasPrefix(route, "$share") {
  56. result = strings.Split(route, "/")[2:]
  57. } else {
  58. result = strings.Split(route, "/")
  59. }
  60. return result
  61. }
  62. // match takes the topic string of the published message and does a basic compare to the
  63. // string of the current Route, if they match it returns true
  64. func (r *route) match(topic string) bool {
  65. return r.topic == topic || routeIncludesTopic(r.topic, topic)
  66. }
  67. type router struct {
  68. sync.RWMutex
  69. routes *list.List
  70. defaultHandler MessageHandler
  71. messages chan *packets.PublishPacket
  72. }
  73. // newRouter returns a new instance of a Router and channel which can be used to tell the Router
  74. // to stop
  75. func newRouter() *router {
  76. router := &router{routes: list.New(), messages: make(chan *packets.PublishPacket)}
  77. return router
  78. }
  79. // addRoute takes a topic string and MessageHandler callback. It looks in the current list of
  80. // routes to see if there is already a matching Route. If there is it replaces the current
  81. // callback with the new one. If not it add a new entry to the list of Routes.
  82. func (r *router) addRoute(topic string, callback MessageHandler) {
  83. r.Lock()
  84. defer r.Unlock()
  85. for e := r.routes.Front(); e != nil; e = e.Next() {
  86. if e.Value.(*route).topic == topic {
  87. r := e.Value.(*route)
  88. r.callback = callback
  89. return
  90. }
  91. }
  92. r.routes.PushBack(&route{topic: topic, callback: callback})
  93. }
  94. // deleteRoute takes a route string, looks for a matching Route in the list of Routes. If
  95. // found it removes the Route from the list.
  96. func (r *router) deleteRoute(topic string) {
  97. r.Lock()
  98. defer r.Unlock()
  99. for e := r.routes.Front(); e != nil; e = e.Next() {
  100. if e.Value.(*route).topic == topic {
  101. r.routes.Remove(e)
  102. return
  103. }
  104. }
  105. }
  106. // setDefaultHandler assigns a default callback that will be called if no matching Route
  107. // is found for an incoming Publish.
  108. func (r *router) setDefaultHandler(handler MessageHandler) {
  109. r.Lock()
  110. defer r.Unlock()
  111. r.defaultHandler = handler
  112. }
  113. // matchAndDispatch takes a channel of Message pointers as input and starts a go routine that
  114. // takes messages off the channel, matches them against the internal route list and calls the
  115. // associated callback (or the defaultHandler, if one exists and no other route matched). If
  116. // anything is sent down the stop channel the function will end.
  117. func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
  118. var wg sync.WaitGroup
  119. ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
  120. var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel
  121. stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
  122. ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
  123. goRoutinesDone := make(chan struct{}) // closed on wg.Done()
  124. if order {
  125. ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
  126. } else {
  127. // When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
  128. ackInChan = make(chan *PacketAndToken)
  129. go func() { // go routine to copy from ackInChan to ackOutChan until stopped
  130. for {
  131. select {
  132. case a := <-ackInChan:
  133. ackOutChan <- a
  134. case <-stopAckCopy:
  135. close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
  136. for {
  137. select {
  138. case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
  139. DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
  140. case <-goRoutinesDone:
  141. close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
  142. DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
  143. return
  144. }
  145. }
  146. }
  147. }
  148. }()
  149. }
  150. go func() { // Main go routine handling inbound messages
  151. for message := range messages {
  152. // DEBUG.Println(ROU, "matchAndDispatch received message")
  153. sent := false
  154. r.RLock()
  155. m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
  156. var handlers []MessageHandler
  157. for e := r.routes.Front(); e != nil; e = e.Next() {
  158. if e.Value.(*route).match(message.TopicName) {
  159. if order {
  160. handlers = append(handlers, e.Value.(*route).callback)
  161. } else {
  162. hd := e.Value.(*route).callback
  163. wg.Add(1)
  164. go func() {
  165. hd(client, m)
  166. m.Ack()
  167. wg.Done()
  168. }()
  169. }
  170. sent = true
  171. }
  172. }
  173. if !sent {
  174. if r.defaultHandler != nil {
  175. if order {
  176. handlers = append(handlers, r.defaultHandler)
  177. } else {
  178. wg.Add(1)
  179. go func() {
  180. r.defaultHandler(client, m)
  181. m.Ack()
  182. wg.Done()
  183. }()
  184. }
  185. } else {
  186. DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
  187. }
  188. }
  189. r.RUnlock()
  190. for _, handler := range handlers {
  191. handler(client, m)
  192. m.Ack()
  193. }
  194. // DEBUG.Println(ROU, "matchAndDispatch handled message")
  195. }
  196. if order {
  197. close(ackOutChan)
  198. } else { // Ensure that nothing further will be written to ackOutChan before closing it
  199. close(stopAckCopy)
  200. <-ackCopyStopped
  201. close(ackOutChan)
  202. go func() {
  203. wg.Wait() // Note: If this remains running then the user has handlers that are not returning
  204. close(goRoutinesDone)
  205. }()
  206. }
  207. DEBUG.Println(ROU, "matchAndDispatch exiting")
  208. }()
  209. return ackOutChan
  210. }