1/*
2 * Copyright (c) 2013 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 *    Seth Hoenig
11 *    Allan Stockdill-Mander
12 *    Mike Robertson
13 */
14
15package mqtt
16
17import (
18	"io/ioutil"
19	"os"
20	"path"
21	"sync"
22
23	"github.com/eclipse/paho.mqtt.golang/packets"
24)
25
26const (
27	msgExt     = ".msg"
28	tmpExt     = ".tmp"
29	corruptExt = ".CORRUPT"
30)
31
32// FileStore implements the store interface using the filesystem to provide
33// true persistence, even across client failure. This is designed to use a
34// single directory per running client. If you are running multiple clients
35// on the same filesystem, you will need to be careful to specify unique
36// store directories for each.
37type FileStore struct {
38	sync.RWMutex
39	directory string
40	opened    bool
41}
42
43// NewFileStore will create a new FileStore which stores its messages in the
44// directory provided.
45func NewFileStore(directory string) *FileStore {
46	store := &FileStore{
47		directory: directory,
48		opened:    false,
49	}
50	return store
51}
52
53// Open will allow the FileStore to be used.
54func (store *FileStore) Open() {
55	store.Lock()
56	defer store.Unlock()
57	// if no store directory was specified in ClientOpts, by default use the
58	// current working directory
59	if store.directory == "" {
60		store.directory, _ = os.Getwd()
61	}
62
63	// if store dir exists, great, otherwise, create it
64	if !exists(store.directory) {
65		perms := os.FileMode(0770)
66		merr := os.MkdirAll(store.directory, perms)
67		chkerr(merr)
68	}
69	store.opened = true
70	DEBUG.Println(STR, "store is opened at", store.directory)
71}
72
73// Close will disallow the FileStore from being used.
74func (store *FileStore) Close() {
75	store.Lock()
76	defer store.Unlock()
77	store.opened = false
78	DEBUG.Println(STR, "store is closed")
79}
80
81// Put will put a message into the store, associated with the provided
82// key value.
83func (store *FileStore) Put(key string, m packets.ControlPacket) {
84	store.Lock()
85	defer store.Unlock()
86	if !store.opened {
87		ERROR.Println(STR, "Trying to use file store, but not open")
88		return
89	}
90	full := fullpath(store.directory, key)
91	write(store.directory, key, m)
92	if !exists(full) {
93		ERROR.Println(STR, "file not created:", full)
94	}
95}
96
97// Get will retrieve a message from the store, the one associated with
98// the provided key value.
99func (store *FileStore) Get(key string) packets.ControlPacket {
100	store.RLock()
101	defer store.RUnlock()
102	if !store.opened {
103		ERROR.Println(STR, "Trying to use file store, but not open")
104		return nil
105	}
106	filepath := fullpath(store.directory, key)
107	if !exists(filepath) {
108		return nil
109	}
110	mfile, oerr := os.Open(filepath)
111	chkerr(oerr)
112	msg, rerr := packets.ReadPacket(mfile)
113	chkerr(mfile.Close())
114
115	// Message was unreadable, return nil
116	if rerr != nil {
117		newpath := corruptpath(store.directory, key)
118		WARN.Println(STR, "corrupted file detected:", rerr.Error(), "archived at:", newpath)
119		os.Rename(filepath, newpath)
120		return nil
121	}
122	return msg
123}
124
125// All will provide a list of all of the keys associated with messages
126// currenly residing in the FileStore.
127func (store *FileStore) All() []string {
128	store.RLock()
129	defer store.RUnlock()
130	return store.all()
131}
132
133// Del will remove the persisted message associated with the provided
134// key from the FileStore.
135func (store *FileStore) Del(key string) {
136	store.Lock()
137	defer store.Unlock()
138	store.del(key)
139}
140
141// Reset will remove all persisted messages from the FileStore.
142func (store *FileStore) Reset() {
143	store.Lock()
144	defer store.Unlock()
145	WARN.Println(STR, "FileStore Reset")
146	for _, key := range store.all() {
147		store.del(key)
148	}
149}
150
151// lockless
152func (store *FileStore) all() []string {
153	if !store.opened {
154		ERROR.Println(STR, "Trying to use file store, but not open")
155		return nil
156	}
157	keys := []string{}
158	files, rderr := ioutil.ReadDir(store.directory)
159	chkerr(rderr)
160	for _, f := range files {
161		DEBUG.Println(STR, "file in All():", f.Name())
162		name := f.Name()
163		if name[len(name)-4:len(name)] != msgExt {
164			DEBUG.Println(STR, "skipping file, doesn't have right extension: ", name)
165			continue
166		}
167		key := name[0 : len(name)-4] // remove file extension
168		keys = append(keys, key)
169	}
170	return keys
171}
172
173// lockless
174func (store *FileStore) del(key string) {
175	if !store.opened {
176		ERROR.Println(STR, "Trying to use file store, but not open")
177		return
178	}
179	DEBUG.Println(STR, "store del filepath:", store.directory)
180	DEBUG.Println(STR, "store delete key:", key)
181	filepath := fullpath(store.directory, key)
182	DEBUG.Println(STR, "path of deletion:", filepath)
183	if !exists(filepath) {
184		WARN.Println(STR, "store could not delete key:", key)
185		return
186	}
187	rerr := os.Remove(filepath)
188	chkerr(rerr)
189	DEBUG.Println(STR, "del msg:", key)
190	if exists(filepath) {
191		ERROR.Println(STR, "file not deleted:", filepath)
192	}
193}
194
195func fullpath(store string, key string) string {
196	p := path.Join(store, key+msgExt)
197	return p
198}
199
200func tmppath(store string, key string) string {
201	p := path.Join(store, key+tmpExt)
202	return p
203}
204
205func corruptpath(store string, key string) string {
206	p := path.Join(store, key+corruptExt)
207	return p
208}
209
210// create file called "X.[messageid].tmp" located in the store
211// the contents of the file is the bytes of the message, then
212// rename it to "X.[messageid].msg", overwriting any existing
213// message with the same id
214// X will be 'i' for inbound messages, and O for outbound messages
215func write(store, key string, m packets.ControlPacket) {
216	temppath := tmppath(store, key)
217	f, err := os.Create(temppath)
218	chkerr(err)
219	werr := m.Write(f)
220	chkerr(werr)
221	cerr := f.Close()
222	chkerr(cerr)
223	rerr := os.Rename(temppath, fullpath(store, key))
224	chkerr(rerr)
225}
226
227func exists(file string) bool {
228	if _, err := os.Stat(file); err != nil {
229		if os.IsNotExist(err) {
230			return false
231		}
232		chkerr(err)
233	}
234	return true
235}
236