store.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. */
  14. package mqtt
  15. import (
  16. "fmt"
  17. "strconv"
  18. "github.com/eclipse/paho.mqtt.golang/packets"
  19. )
  20. const (
  21. inboundPrefix = "i."
  22. outboundPrefix = "o."
  23. )
  24. // Store is an interface which can be used to provide implementations
  25. // for message persistence.
  26. // Because we may have to store distinct messages with the same
  27. // message ID, we need a unique key for each message. This is
  28. // possible by prepending "i." or "o." to each message id
  29. type Store interface {
  30. Open()
  31. Put(key string, message packets.ControlPacket)
  32. Get(key string) packets.ControlPacket
  33. All() []string
  34. Del(key string)
  35. Close()
  36. Reset()
  37. }
  38. // A key MUST have the form "X.[messageid]"
  39. // where X is 'i' or 'o'
  40. func mIDFromKey(key string) uint16 {
  41. s := key[2:]
  42. i, err := strconv.Atoi(s)
  43. chkerr(err)
  44. return uint16(i)
  45. }
  46. // Return true if key prefix is outbound
  47. func isKeyOutbound(key string) bool {
  48. return key[:2] == outboundPrefix
  49. }
  50. // Return true if key prefix is inbound
  51. func isKeyInbound(key string) bool {
  52. return key[:2] == inboundPrefix
  53. }
  54. // Return a string of the form "i.[id]"
  55. func inboundKeyFromMID(id uint16) string {
  56. return fmt.Sprintf("%s%d", inboundPrefix, id)
  57. }
  58. // Return a string of the form "o.[id]"
  59. func outboundKeyFromMID(id uint16) string {
  60. return fmt.Sprintf("%s%d", outboundPrefix, id)
  61. }
  62. // govern which outgoing messages are persisted
  63. func persistOutbound(s Store, m packets.ControlPacket) {
  64. switch m.Details().Qos {
  65. case 0:
  66. switch m.(type) {
  67. case *packets.PubackPacket, *packets.PubcompPacket:
  68. // Sending puback. delete matching publish
  69. // from ibound
  70. s.Del(inboundKeyFromMID(m.Details().MessageID))
  71. }
  72. case 1:
  73. switch m.(type) {
  74. case *packets.PublishPacket, *packets.PubrelPacket, *packets.SubscribePacket, *packets.UnsubscribePacket:
  75. // Sending publish. store in obound
  76. // until puback received
  77. s.Put(outboundKeyFromMID(m.Details().MessageID), m)
  78. default:
  79. ERROR.Println(STR, "Asked to persist an invalid message type")
  80. }
  81. case 2:
  82. switch m.(type) {
  83. case *packets.PublishPacket:
  84. // Sending publish. store in obound
  85. // until pubrel received
  86. s.Put(outboundKeyFromMID(m.Details().MessageID), m)
  87. default:
  88. ERROR.Println(STR, "Asked to persist an invalid message type")
  89. }
  90. }
  91. }
  92. // govern which incoming messages are persisted
  93. func persistInbound(s Store, m packets.ControlPacket) {
  94. switch m.Details().Qos {
  95. case 0:
  96. switch m.(type) {
  97. case *packets.PubackPacket, *packets.SubackPacket, *packets.UnsubackPacket, *packets.PubcompPacket:
  98. // Received a puback. delete matching publish
  99. // from obound
  100. s.Del(outboundKeyFromMID(m.Details().MessageID))
  101. case *packets.PublishPacket, *packets.PubrecPacket, *packets.PingrespPacket, *packets.ConnackPacket:
  102. default:
  103. ERROR.Println(STR, "Asked to persist an invalid messages type")
  104. }
  105. case 1:
  106. switch m.(type) {
  107. case *packets.PublishPacket, *packets.PubrelPacket:
  108. // Received a publish. store it in ibound
  109. // until puback sent
  110. s.Put(inboundKeyFromMID(m.Details().MessageID), m)
  111. default:
  112. ERROR.Println(STR, "Asked to persist an invalid messages type")
  113. }
  114. case 2:
  115. switch m.(type) {
  116. case *packets.PublishPacket:
  117. // Received a publish. store it in ibound
  118. // until pubrel received
  119. s.Put(inboundKeyFromMID(m.Details().MessageID), m)
  120. default:
  121. ERROR.Println(STR, "Asked to persist an invalid messages type")
  122. }
  123. }
  124. }