1/* 2 * Copyright (c) 2013 IBM Corp. 3 * 4 * All rights reserved. This program and the accompanying materials 5 * are made available under the terms of the Eclipse Public License v1.0 6 * which accompanies this distribution, and is available at 7 * http://www.eclipse.org/legal/epl-v10.html 8 * 9 * Contributors: 10 * Seth Hoenig 11 * Allan Stockdill-Mander 12 * Mike Robertson 13 */ 14 15package mqtt 16 17import ( 18 "io/ioutil" 19 "os" 20 "path" 21 "sync" 22 23 "github.com/eclipse/paho.mqtt.golang/packets" 24) 25 26const ( 27 msgExt = ".msg" 28 tmpExt = ".tmp" 29 corruptExt = ".CORRUPT" 30) 31 32// FileStore implements the store interface using the filesystem to provide 33// true persistence, even across client failure. This is designed to use a 34// single directory per running client. If you are running multiple clients 35// on the same filesystem, you will need to be careful to specify unique 36// store directories for each. 37type FileStore struct { 38 sync.RWMutex 39 directory string 40 opened bool 41} 42 43// NewFileStore will create a new FileStore which stores its messages in the 44// directory provided. 45func NewFileStore(directory string) *FileStore { 46 store := &FileStore{ 47 directory: directory, 48 opened: false, 49 } 50 return store 51} 52 53// Open will allow the FileStore to be used. 54func (store *FileStore) Open() { 55 store.Lock() 56 defer store.Unlock() 57 // if no store directory was specified in ClientOpts, by default use the 58 // current working directory 59 if store.directory == "" { 60 store.directory, _ = os.Getwd() 61 } 62 63 // if store dir exists, great, otherwise, create it 64 if !exists(store.directory) { 65 perms := os.FileMode(0770) 66 merr := os.MkdirAll(store.directory, perms) 67 chkerr(merr) 68 } 69 store.opened = true 70 DEBUG.Println(STR, "store is opened at", store.directory) 71} 72 73// Close will disallow the FileStore from being used. 74func (store *FileStore) Close() { 75 store.Lock() 76 defer store.Unlock() 77 store.opened = false 78 DEBUG.Println(STR, "store is closed") 79} 80 81// Put will put a message into the store, associated with the provided 82// key value. 83func (store *FileStore) Put(key string, m packets.ControlPacket) { 84 store.Lock() 85 defer store.Unlock() 86 if !store.opened { 87 ERROR.Println(STR, "Trying to use file store, but not open") 88 return 89 } 90 full := fullpath(store.directory, key) 91 write(store.directory, key, m) 92 if !exists(full) { 93 ERROR.Println(STR, "file not created:", full) 94 } 95} 96 97// Get will retrieve a message from the store, the one associated with 98// the provided key value. 99func (store *FileStore) Get(key string) packets.ControlPacket { 100 store.RLock() 101 defer store.RUnlock() 102 if !store.opened { 103 ERROR.Println(STR, "Trying to use file store, but not open") 104 return nil 105 } 106 filepath := fullpath(store.directory, key) 107 if !exists(filepath) { 108 return nil 109 } 110 mfile, oerr := os.Open(filepath) 111 chkerr(oerr) 112 msg, rerr := packets.ReadPacket(mfile) 113 chkerr(mfile.Close()) 114 115 // Message was unreadable, return nil 116 if rerr != nil { 117 newpath := corruptpath(store.directory, key) 118 WARN.Println(STR, "corrupted file detected:", rerr.Error(), "archived at:", newpath) 119 os.Rename(filepath, newpath) 120 return nil 121 } 122 return msg 123} 124 125// All will provide a list of all of the keys associated with messages 126// currenly residing in the FileStore. 127func (store *FileStore) All() []string { 128 store.RLock() 129 defer store.RUnlock() 130 return store.all() 131} 132 133// Del will remove the persisted message associated with the provided 134// key from the FileStore. 135func (store *FileStore) Del(key string) { 136 store.Lock() 137 defer store.Unlock() 138 store.del(key) 139} 140 141// Reset will remove all persisted messages from the FileStore. 142func (store *FileStore) Reset() { 143 store.Lock() 144 defer store.Unlock() 145 WARN.Println(STR, "FileStore Reset") 146 for _, key := range store.all() { 147 store.del(key) 148 } 149} 150 151// lockless 152func (store *FileStore) all() []string { 153 if !store.opened { 154 ERROR.Println(STR, "Trying to use file store, but not open") 155 return nil 156 } 157 keys := []string{} 158 files, rderr := ioutil.ReadDir(store.directory) 159 chkerr(rderr) 160 for _, f := range files { 161 DEBUG.Println(STR, "file in All():", f.Name()) 162 name := f.Name() 163 if name[len(name)-4:len(name)] != msgExt { 164 DEBUG.Println(STR, "skipping file, doesn't have right extension: ", name) 165 continue 166 } 167 key := name[0 : len(name)-4] // remove file extension 168 keys = append(keys, key) 169 } 170 return keys 171} 172 173// lockless 174func (store *FileStore) del(key string) { 175 if !store.opened { 176 ERROR.Println(STR, "Trying to use file store, but not open") 177 return 178 } 179 DEBUG.Println(STR, "store del filepath:", store.directory) 180 DEBUG.Println(STR, "store delete key:", key) 181 filepath := fullpath(store.directory, key) 182 DEBUG.Println(STR, "path of deletion:", filepath) 183 if !exists(filepath) { 184 WARN.Println(STR, "store could not delete key:", key) 185 return 186 } 187 rerr := os.Remove(filepath) 188 chkerr(rerr) 189 DEBUG.Println(STR, "del msg:", key) 190 if exists(filepath) { 191 ERROR.Println(STR, "file not deleted:", filepath) 192 } 193} 194 195func fullpath(store string, key string) string { 196 p := path.Join(store, key+msgExt) 197 return p 198} 199 200func tmppath(store string, key string) string { 201 p := path.Join(store, key+tmpExt) 202 return p 203} 204 205func corruptpath(store string, key string) string { 206 p := path.Join(store, key+corruptExt) 207 return p 208} 209 210// create file called "X.[messageid].tmp" located in the store 211// the contents of the file is the bytes of the message, then 212// rename it to "X.[messageid].msg", overwriting any existing 213// message with the same id 214// X will be 'i' for inbound messages, and O for outbound messages 215func write(store, key string, m packets.ControlPacket) { 216 temppath := tmppath(store, key) 217 f, err := os.Create(temppath) 218 chkerr(err) 219 werr := m.Write(f) 220 chkerr(werr) 221 cerr := f.Close() 222 chkerr(cerr) 223 rerr := os.Rename(temppath, fullpath(store, key)) 224 chkerr(rerr) 225} 226 227func exists(file string) bool { 228 if _, err := os.Stat(file); err != nil { 229 if os.IsNotExist(err) { 230 return false 231 } 232 chkerr(err) 233 } 234 return true 235} 236