123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- /*
- * Copyright (c) 2013 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Seth Hoenig
- * Allan Stockdill-Mander
- * Mike Robertson
- */
- package mqtt
- import (
- "container/list"
- "strings"
- "sync"
- "github.com/eclipse/paho.mqtt.golang/packets"
- )
- // route is a type which associates MQTT Topic strings with a
- // callback to be executed upon the arrival of a message associated
- // with a subscription to that topic.
- type route struct {
- topic string
- callback MessageHandler
- }
- // match takes a slice of strings which represent the route being tested having been split on '/'
- // separators, and a slice of strings representing the topic string in the published message, similarly
- // split.
- // The function determines if the topic string matches the route according to the MQTT topic rules
- // and returns a boolean of the outcome
- func match(route []string, topic []string) bool {
- if len(route) == 0 {
- return len(topic) == 0
- }
- if len(topic) == 0 {
- return route[0] == "#"
- }
- if route[0] == "#" {
- return true
- }
- if (route[0] == "+") || (route[0] == topic[0]) {
- return match(route[1:], topic[1:])
- }
- return false
- }
- func routeIncludesTopic(route, topic string) bool {
- return match(routeSplit(route), strings.Split(topic, "/"))
- }
- // removes $share and sharename when splitting the route to allow
- // shared subscription routes to correctly match the topic
- func routeSplit(route string) []string {
- var result []string
- if strings.HasPrefix(route, "$share") {
- result = strings.Split(route, "/")[2:]
- } else {
- result = strings.Split(route, "/")
- }
- return result
- }
- // match takes the topic string of the published message and does a basic compare to the
- // string of the current Route, if they match it returns true
- func (r *route) match(topic string) bool {
- return r.topic == topic || routeIncludesTopic(r.topic, topic)
- }
- type router struct {
- sync.RWMutex
- routes *list.List
- defaultHandler MessageHandler
- messages chan *packets.PublishPacket
- }
- // newRouter returns a new instance of a Router and channel which can be used to tell the Router
- // to stop
- func newRouter() *router {
- router := &router{routes: list.New(), messages: make(chan *packets.PublishPacket)}
- return router
- }
- // addRoute takes a topic string and MessageHandler callback. It looks in the current list of
- // routes to see if there is already a matching Route. If there is it replaces the current
- // callback with the new one. If not it add a new entry to the list of Routes.
- func (r *router) addRoute(topic string, callback MessageHandler) {
- r.Lock()
- defer r.Unlock()
- for e := r.routes.Front(); e != nil; e = e.Next() {
- if e.Value.(*route).topic == topic {
- r := e.Value.(*route)
- r.callback = callback
- return
- }
- }
- r.routes.PushBack(&route{topic: topic, callback: callback})
- }
- // deleteRoute takes a route string, looks for a matching Route in the list of Routes. If
- // found it removes the Route from the list.
- func (r *router) deleteRoute(topic string) {
- r.Lock()
- defer r.Unlock()
- for e := r.routes.Front(); e != nil; e = e.Next() {
- if e.Value.(*route).topic == topic {
- r.routes.Remove(e)
- return
- }
- }
- }
- // setDefaultHandler assigns a default callback that will be called if no matching Route
- // is found for an incoming Publish.
- func (r *router) setDefaultHandler(handler MessageHandler) {
- r.Lock()
- defer r.Unlock()
- r.defaultHandler = handler
- }
- // matchAndDispatch takes a channel of Message pointers as input and starts a go routine that
- // takes messages off the channel, matches them against the internal route list and calls the
- // associated callback (or the defaultHandler, if one exists and no other route matched). If
- // anything is sent down the stop channel the function will end.
- func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
- var wg sync.WaitGroup
- ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
- var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel
- stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
- ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
- goRoutinesDone := make(chan struct{}) // closed on wg.Done()
- if order {
- ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
- } else {
- // When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
- ackInChan = make(chan *PacketAndToken)
- go func() { // go routine to copy from ackInChan to ackOutChan until stopped
- for {
- select {
- case a := <-ackInChan:
- ackOutChan <- a
- case <-stopAckCopy:
- close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
- for {
- select {
- case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
- DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
- case <-goRoutinesDone:
- close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
- DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
- return
- }
- }
- }
- }
- }()
- }
- go func() { // Main go routine handling inbound messages
- for message := range messages {
- // DEBUG.Println(ROU, "matchAndDispatch received message")
- sent := false
- r.RLock()
- m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
- var handlers []MessageHandler
- for e := r.routes.Front(); e != nil; e = e.Next() {
- if e.Value.(*route).match(message.TopicName) {
- if order {
- handlers = append(handlers, e.Value.(*route).callback)
- } else {
- hd := e.Value.(*route).callback
- wg.Add(1)
- go func() {
- hd(client, m)
- m.Ack()
- wg.Done()
- }()
- }
- sent = true
- }
- }
- if !sent {
- if r.defaultHandler != nil {
- if order {
- handlers = append(handlers, r.defaultHandler)
- } else {
- wg.Add(1)
- go func() {
- r.defaultHandler(client, m)
- m.Ack()
- wg.Done()
- }()
- }
- } else {
- DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
- }
- }
- r.RUnlock()
- for _, handler := range handlers {
- handler(client, m)
- m.Ack()
- }
- // DEBUG.Println(ROU, "matchAndDispatch handled message")
- }
- if order {
- close(ackOutChan)
- } else { // Ensure that nothing further will be written to ackOutChan before closing it
- close(stopAckCopy)
- <-ackCopyStopped
- close(ackOutChan)
- go func() {
- wg.Wait() // Note: If this remains running then the user has handlers that are not returning
- close(goRoutinesDone)
- }()
- }
- DEBUG.Println(ROU, "matchAndDispatch exiting")
- }()
- return ackOutChan
- }
|