123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- /*
- * Copyright (c) 2013 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Seth Hoenig
- * Allan Stockdill-Mander
- * Mike Robertson
- */
- package mqtt
- import (
- "io/ioutil"
- "os"
- "path"
- "sort"
- "sync"
- "github.com/eclipse/paho.mqtt.golang/packets"
- )
- const (
- msgExt = ".msg"
- tmpExt = ".tmp"
- corruptExt = ".CORRUPT"
- )
- // FileStore implements the store interface using the filesystem to provide
- // true persistence, even across client failure. This is designed to use a
- // single directory per running client. If you are running multiple clients
- // on the same filesystem, you will need to be careful to specify unique
- // store directories for each.
- type FileStore struct {
- sync.RWMutex
- directory string
- opened bool
- }
- // NewFileStore will create a new FileStore which stores its messages in the
- // directory provided.
- func NewFileStore(directory string) *FileStore {
- store := &FileStore{
- directory: directory,
- opened: false,
- }
- return store
- }
- // Open will allow the FileStore to be used.
- func (store *FileStore) Open() {
- store.Lock()
- defer store.Unlock()
- // if no store directory was specified in ClientOpts, by default use the
- // current working directory
- if store.directory == "" {
- store.directory, _ = os.Getwd()
- }
- // if store dir exists, great, otherwise, create it
- if !exists(store.directory) {
- perms := os.FileMode(0770)
- merr := os.MkdirAll(store.directory, perms)
- chkerr(merr)
- }
- store.opened = true
- DEBUG.Println(STR, "store is opened at", store.directory)
- }
- // Close will disallow the FileStore from being used.
- func (store *FileStore) Close() {
- store.Lock()
- defer store.Unlock()
- store.opened = false
- DEBUG.Println(STR, "store is closed")
- }
- // Put will put a message into the store, associated with the provided
- // key value.
- func (store *FileStore) Put(key string, m packets.ControlPacket) {
- store.Lock()
- defer store.Unlock()
- if !store.opened {
- ERROR.Println(STR, "Trying to use file store, but not open")
- return
- }
- full := fullpath(store.directory, key)
- write(store.directory, key, m)
- if !exists(full) {
- ERROR.Println(STR, "file not created:", full)
- }
- }
- // Get will retrieve a message from the store, the one associated with
- // the provided key value.
- func (store *FileStore) Get(key string) packets.ControlPacket {
- store.RLock()
- defer store.RUnlock()
- if !store.opened {
- ERROR.Println(STR, "trying to use file store, but not open")
- return nil
- }
- filepath := fullpath(store.directory, key)
- if !exists(filepath) {
- return nil
- }
- mfile, oerr := os.Open(filepath)
- chkerr(oerr)
- msg, rerr := packets.ReadPacket(mfile)
- chkerr(mfile.Close())
- // Message was unreadable, return nil
- if rerr != nil {
- newpath := corruptpath(store.directory, key)
- WARN.Println(STR, "corrupted file detected:", rerr.Error(), "archived at:", newpath)
- if err := os.Rename(filepath, newpath); err != nil {
- ERROR.Println(STR, err)
- }
- return nil
- }
- return msg
- }
- // All will provide a list of all of the keys associated with messages
- // currently residing in the FileStore.
- func (store *FileStore) All() []string {
- store.RLock()
- defer store.RUnlock()
- return store.all()
- }
- // Del will remove the persisted message associated with the provided
- // key from the FileStore.
- func (store *FileStore) Del(key string) {
- store.Lock()
- defer store.Unlock()
- store.del(key)
- }
- // Reset will remove all persisted messages from the FileStore.
- func (store *FileStore) Reset() {
- store.Lock()
- defer store.Unlock()
- WARN.Println(STR, "FileStore Reset")
- for _, key := range store.all() {
- store.del(key)
- }
- }
- // lockless
- func (store *FileStore) all() []string {
- var err error
- var keys []string
- var files fileInfos
- if !store.opened {
- ERROR.Println(STR, "trying to use file store, but not open")
- return nil
- }
- files, err = ioutil.ReadDir(store.directory)
- chkerr(err)
- sort.Sort(files)
- for _, f := range files {
- DEBUG.Println(STR, "file in All():", f.Name())
- name := f.Name()
- if name[len(name)-4:] != msgExt {
- DEBUG.Println(STR, "skipping file, doesn't have right extension: ", name)
- continue
- }
- key := name[0 : len(name)-4] // remove file extension
- keys = append(keys, key)
- }
- return keys
- }
- // lockless
- func (store *FileStore) del(key string) {
- if !store.opened {
- ERROR.Println(STR, "trying to use file store, but not open")
- return
- }
- DEBUG.Println(STR, "store del filepath:", store.directory)
- DEBUG.Println(STR, "store delete key:", key)
- filepath := fullpath(store.directory, key)
- DEBUG.Println(STR, "path of deletion:", filepath)
- if !exists(filepath) {
- WARN.Println(STR, "store could not delete key:", key)
- return
- }
- rerr := os.Remove(filepath)
- chkerr(rerr)
- DEBUG.Println(STR, "del msg:", key)
- if exists(filepath) {
- ERROR.Println(STR, "file not deleted:", filepath)
- }
- }
- func fullpath(store string, key string) string {
- p := path.Join(store, key+msgExt)
- return p
- }
- func tmppath(store string, key string) string {
- p := path.Join(store, key+tmpExt)
- return p
- }
- func corruptpath(store string, key string) string {
- p := path.Join(store, key+corruptExt)
- return p
- }
- // create file called "X.[messageid].tmp" located in the store
- // the contents of the file is the bytes of the message, then
- // rename it to "X.[messageid].msg", overwriting any existing
- // message with the same id
- // X will be 'i' for inbound messages, and O for outbound messages
- func write(store, key string, m packets.ControlPacket) {
- temppath := tmppath(store, key)
- f, err := os.Create(temppath)
- chkerr(err)
- werr := m.Write(f)
- chkerr(werr)
- cerr := f.Close()
- chkerr(cerr)
- rerr := os.Rename(temppath, fullpath(store, key))
- chkerr(rerr)
- }
- func exists(file string) bool {
- if _, err := os.Stat(file); err != nil {
- if os.IsNotExist(err) {
- return false
- }
- chkerr(err)
- }
- return true
- }
- type fileInfos []os.FileInfo
- func (f fileInfos) Len() int {
- return len(f)
- }
- func (f fileInfos) Swap(i, j int) {
- f[i], f[j] = f[j], f[i]
- }
- func (f fileInfos) Less(i, j int) bool {
- return f[i].ModTime().Before(f[j].ModTime())
- }
|