filestore.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. */
  14. package mqtt
  15. import (
  16. "io/ioutil"
  17. "os"
  18. "path"
  19. "sort"
  20. "sync"
  21. "github.com/eclipse/paho.mqtt.golang/packets"
  22. )
  23. const (
  24. msgExt = ".msg"
  25. tmpExt = ".tmp"
  26. corruptExt = ".CORRUPT"
  27. )
  28. // FileStore implements the store interface using the filesystem to provide
  29. // true persistence, even across client failure. This is designed to use a
  30. // single directory per running client. If you are running multiple clients
  31. // on the same filesystem, you will need to be careful to specify unique
  32. // store directories for each.
  33. type FileStore struct {
  34. sync.RWMutex
  35. directory string
  36. opened bool
  37. }
  38. // NewFileStore will create a new FileStore which stores its messages in the
  39. // directory provided.
  40. func NewFileStore(directory string) *FileStore {
  41. store := &FileStore{
  42. directory: directory,
  43. opened: false,
  44. }
  45. return store
  46. }
  47. // Open will allow the FileStore to be used.
  48. func (store *FileStore) Open() {
  49. store.Lock()
  50. defer store.Unlock()
  51. // if no store directory was specified in ClientOpts, by default use the
  52. // current working directory
  53. if store.directory == "" {
  54. store.directory, _ = os.Getwd()
  55. }
  56. // if store dir exists, great, otherwise, create it
  57. if !exists(store.directory) {
  58. perms := os.FileMode(0770)
  59. merr := os.MkdirAll(store.directory, perms)
  60. chkerr(merr)
  61. }
  62. store.opened = true
  63. DEBUG.Println(STR, "store is opened at", store.directory)
  64. }
  65. // Close will disallow the FileStore from being used.
  66. func (store *FileStore) Close() {
  67. store.Lock()
  68. defer store.Unlock()
  69. store.opened = false
  70. DEBUG.Println(STR, "store is closed")
  71. }
  72. // Put will put a message into the store, associated with the provided
  73. // key value.
  74. func (store *FileStore) Put(key string, m packets.ControlPacket) {
  75. store.Lock()
  76. defer store.Unlock()
  77. if !store.opened {
  78. ERROR.Println(STR, "Trying to use file store, but not open")
  79. return
  80. }
  81. full := fullpath(store.directory, key)
  82. write(store.directory, key, m)
  83. if !exists(full) {
  84. ERROR.Println(STR, "file not created:", full)
  85. }
  86. }
  87. // Get will retrieve a message from the store, the one associated with
  88. // the provided key value.
  89. func (store *FileStore) Get(key string) packets.ControlPacket {
  90. store.RLock()
  91. defer store.RUnlock()
  92. if !store.opened {
  93. ERROR.Println(STR, "trying to use file store, but not open")
  94. return nil
  95. }
  96. filepath := fullpath(store.directory, key)
  97. if !exists(filepath) {
  98. return nil
  99. }
  100. mfile, oerr := os.Open(filepath)
  101. chkerr(oerr)
  102. msg, rerr := packets.ReadPacket(mfile)
  103. chkerr(mfile.Close())
  104. // Message was unreadable, return nil
  105. if rerr != nil {
  106. newpath := corruptpath(store.directory, key)
  107. WARN.Println(STR, "corrupted file detected:", rerr.Error(), "archived at:", newpath)
  108. if err := os.Rename(filepath, newpath); err != nil {
  109. ERROR.Println(STR, err)
  110. }
  111. return nil
  112. }
  113. return msg
  114. }
  115. // All will provide a list of all of the keys associated with messages
  116. // currently residing in the FileStore.
  117. func (store *FileStore) All() []string {
  118. store.RLock()
  119. defer store.RUnlock()
  120. return store.all()
  121. }
  122. // Del will remove the persisted message associated with the provided
  123. // key from the FileStore.
  124. func (store *FileStore) Del(key string) {
  125. store.Lock()
  126. defer store.Unlock()
  127. store.del(key)
  128. }
  129. // Reset will remove all persisted messages from the FileStore.
  130. func (store *FileStore) Reset() {
  131. store.Lock()
  132. defer store.Unlock()
  133. WARN.Println(STR, "FileStore Reset")
  134. for _, key := range store.all() {
  135. store.del(key)
  136. }
  137. }
  138. // lockless
  139. func (store *FileStore) all() []string {
  140. var err error
  141. var keys []string
  142. var files fileInfos
  143. if !store.opened {
  144. ERROR.Println(STR, "trying to use file store, but not open")
  145. return nil
  146. }
  147. files, err = ioutil.ReadDir(store.directory)
  148. chkerr(err)
  149. sort.Sort(files)
  150. for _, f := range files {
  151. DEBUG.Println(STR, "file in All():", f.Name())
  152. name := f.Name()
  153. if name[len(name)-4:] != msgExt {
  154. DEBUG.Println(STR, "skipping file, doesn't have right extension: ", name)
  155. continue
  156. }
  157. key := name[0 : len(name)-4] // remove file extension
  158. keys = append(keys, key)
  159. }
  160. return keys
  161. }
  162. // lockless
  163. func (store *FileStore) del(key string) {
  164. if !store.opened {
  165. ERROR.Println(STR, "trying to use file store, but not open")
  166. return
  167. }
  168. DEBUG.Println(STR, "store del filepath:", store.directory)
  169. DEBUG.Println(STR, "store delete key:", key)
  170. filepath := fullpath(store.directory, key)
  171. DEBUG.Println(STR, "path of deletion:", filepath)
  172. if !exists(filepath) {
  173. WARN.Println(STR, "store could not delete key:", key)
  174. return
  175. }
  176. rerr := os.Remove(filepath)
  177. chkerr(rerr)
  178. DEBUG.Println(STR, "del msg:", key)
  179. if exists(filepath) {
  180. ERROR.Println(STR, "file not deleted:", filepath)
  181. }
  182. }
  183. func fullpath(store string, key string) string {
  184. p := path.Join(store, key+msgExt)
  185. return p
  186. }
  187. func tmppath(store string, key string) string {
  188. p := path.Join(store, key+tmpExt)
  189. return p
  190. }
  191. func corruptpath(store string, key string) string {
  192. p := path.Join(store, key+corruptExt)
  193. return p
  194. }
  195. // create file called "X.[messageid].tmp" located in the store
  196. // the contents of the file is the bytes of the message, then
  197. // rename it to "X.[messageid].msg", overwriting any existing
  198. // message with the same id
  199. // X will be 'i' for inbound messages, and O for outbound messages
  200. func write(store, key string, m packets.ControlPacket) {
  201. temppath := tmppath(store, key)
  202. f, err := os.Create(temppath)
  203. chkerr(err)
  204. werr := m.Write(f)
  205. chkerr(werr)
  206. cerr := f.Close()
  207. chkerr(cerr)
  208. rerr := os.Rename(temppath, fullpath(store, key))
  209. chkerr(rerr)
  210. }
  211. func exists(file string) bool {
  212. if _, err := os.Stat(file); err != nil {
  213. if os.IsNotExist(err) {
  214. return false
  215. }
  216. chkerr(err)
  217. }
  218. return true
  219. }
  220. type fileInfos []os.FileInfo
  221. func (f fileInfos) Len() int {
  222. return len(f)
  223. }
  224. func (f fileInfos) Swap(i, j int) {
  225. f[i], f[j] = f[j], f[i]
  226. }
  227. func (f fileInfos) Less(i, j int) bool {
  228. return f[i].ModTime().Before(f[j].ModTime())
  229. }