1 #include <string>
2 #include <iostream>
3 #include <chrono>
4 #include <thread>
5 #include <stdexcept>
6 #include <limits>
7 #include <vector>
8 #include <algorithm>
9 
10 #include <cstdio>
11 #include <cstring>
12 #ifndef WIN32
13 #include <fcntl.h>
14 #include <unistd.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #endif
18 
19 #include "multiprocessing.h"
20 // #define DEBUG
21 #undef DEBUG
22 #include "filestack.h"
23 
24 using namespace std;
25 
26 
27 const string default_file_name = "default_stack.idx";
28 const int default_max_line_length = 4096;
29 
30 
FileStack()31 FileStack::FileStack() : FileStack::FileStack(default_file_name) {
32     std::cerr << "FileStack: Using default file name " << default_file_name << std::endl;
33 }
34 
FileStack(const string & file_name)35 FileStack::FileStack(const string & file_name) : FileStack::FileStack(file_name, default_max_line_length) {
36 }
37 
FileStack(const string & file_name,int maximum_line_length)38 FileStack::FileStack(const string & file_name, int maximum_line_length) {
39 #ifndef WIN32
40     DBG("");
41     fd = open(file_name.c_str(), O_RDWR | O_CREAT, 00664);
42     if (fd == -1) {
43         throw(std::runtime_error("could not open file " + file_name));
44     }
45     this->locked = false;
46     this->file_name = file_name;
47     set_max_line_length(maximum_line_length);
48 #endif
49 }
50 
~FileStack()51 FileStack::~FileStack() {
52     DBG("");
53 #ifndef WIN32
54     close(fd);
55 #endif
56 }
57 
58 
59 
lock()60 int FileStack::lock() {
61 #ifndef WIN32
62     DBG("");
63     int fcntl_status = -1;
64     if (fd >= 0) {
65         memset(&lck, 0, sizeof(lck));
66         lck.l_type = F_WRLCK; // exclusive lock
67         lck.l_whence = SEEK_SET;
68         lck.l_start = 0;
69         lck.l_len = 0;
70         fcntl_status = fcntl(fd, F_SETLKW, &lck);
71         if (fcntl_status == -1) {
72             throw(std::runtime_error("could not put lock on file " + file_name));
73         } else {
74             locked = true;
75         }
76     } else {
77         throw(std::runtime_error("could not put lock on non-open file " + file_name));
78     }
79     return fcntl_status;
80 #else
81     return 0;
82 #endif
83 }
84 
unlock()85 int FileStack::unlock() {
86 #ifndef WIN32
87     DBG("");
88     int fcntl_status = -1;
89     if (fd >= 0) {
90         lck.l_type = F_UNLCK;
91         fcntl_status = fcntl(fd, F_SETLKW, &lck);
92         if (fcntl_status == -1) {
93             throw(std::runtime_error("could not unlock file " + file_name));
94         } else {
95             locked = false;
96         }
97     }
98     return fcntl_status;
99 #else
100     return 0;
101 #endif
102 }
103 
pop_non_locked(string & buf,const bool keep_flag,size_t & size_after_pop)104 int FileStack::pop_non_locked(string & buf, const bool keep_flag, size_t & size_after_pop) {
105 #ifndef WIN32
106     DBG("");
107     buf.clear();
108     int stat = 0;
109     const off_t size = lseek(fd, 0, SEEK_END);
110     if (size > 0) {
111         off_t jmp = size - max_line_length;
112         if (jmp < 0) jmp = 0;
113         lseek(fd, jmp, SEEK_SET);
114 
115         char * raw = new char[max_line_length * sizeof(char)];
116         const ssize_t n_read = read(fd, raw, max_line_length);
117         string chunk;
118         chunk.assign(raw, n_read);
119         delete [] raw;
120 
121         size_t begin = 0, end = 0;
122         const char key = '\n';
123         size_t found = chunk.rfind(key);
124         if (found != string::npos) {
125             end = found;
126         }
127         if (end > 0) {
128             found = chunk.rfind(key, end-1);
129             if (found != string::npos) {
130                 begin = found + 1;
131             }
132         }
133         const size_t line_size = end - begin + 1;
134         if (line_size > 0) {
135             buf.assign(chunk, begin, line_size - 1);
136             if (! keep_flag) {
137                 stat = ftruncate(fd, size - line_size);
138             }
139         }
140     }
141     if (size_after_pop != numeric_limits<size_t>::max()) {
142         size_after_pop = this->size();
143     }
144     if (stat == -1) {
145         return stat;
146     } else {
147         DBG(buf);
148         return buf.size();
149     }
150 #else
151     return 0;
152 #endif
153 }
154 
pop_non_locked(string & buf)155 int FileStack::pop_non_locked(string & buf) {
156     DBG("");
157     size_t size_after_pop = numeric_limits<size_t>::max();
158     return pop_non_locked(buf, false, size_after_pop);
159 }
160 
pop(string & buf,const bool keep_flag,size_t & size_after_pop)161 int FileStack::pop(string & buf, const bool keep_flag, size_t & size_after_pop) {
162 #ifndef WIN32
163     DBG("");
164     bool locked_internally = false;
165     if (! locked) {
166         lock();
167         locked_internally = true;
168     }
169     int val = pop_non_locked(buf, keep_flag, size_after_pop);
170     if (locked_internally) {
171         unlock();
172     }
173     return val;
174 #else
175     return 0;
176 #endif
177 }
178 
pop(string & buf)179 int FileStack::pop(string & buf) {
180     DBG("");
181     size_t size_after_pop = numeric_limits<size_t>::max();
182     return pop(buf, false, size_after_pop);
183 }
184 
top(string & buf)185 int FileStack::top(string & buf) {
186     DBG("");
187     size_t size_after_pop = numeric_limits<size_t>::max();
188     return pop(buf, true, size_after_pop);
189 }
190 
pop(string & buf,size_t & size_after_pop)191 int FileStack::pop(string & buf, size_t & size_after_pop) {
192     DBG("");
193     return pop(buf, false, size_after_pop);
194 }
195 
pop(int & i)196 int FileStack::pop(int & i) {
197     DBG("");
198     string buf;
199     size_t size_after_pop = numeric_limits<size_t>::max();
200     const int get_status = pop(buf, false, size_after_pop);
201     if (get_status > 0) {
202         return i = stoi(buf);
203     } else {
204         return -1;
205     }
206 }
207 
top(int & i)208 int FileStack::top(int & i) {
209     DBG("");
210     string buf;
211     size_t size_after_pop = numeric_limits<size_t>::max();
212     const int get_status = pop(buf, true, size_after_pop);
213     if (get_status > 0) {
214         return i = stoi(buf);
215     } else {
216         return -1;
217     }
218 }
219 
remove(const string & line)220 int FileStack::remove(const string & line) {
221     DBG("");
222 #ifndef WIN32
223     bool locked_internally = false;
224     if (! locked) {
225         lock();
226         locked_internally = true;
227     }
228     const off_t size = lseek(fd, 0, SEEK_END);
229     lseek(fd, 0, SEEK_SET);
230 
231     char * raw = new char[size * sizeof(char)];
232     const ssize_t n_read = read(fd, raw, size);
233     string buf;
234     buf.assign(raw, n_read);
235     delete [] raw;
236 
237     vector<string> tokens = split(buf, '\n');
238     buf.clear();
239 
240     tokens.erase(std::remove(tokens.begin(), tokens.end(), line), tokens.end());
241 
242     lseek(fd, 0, SEEK_SET);
243     int stat = ftruncate(fd, 0);
244     for (auto it = tokens.begin(); it != tokens.end(); ++it) {
245         buf = *it + '\n';
246         size_t n = write(fd, buf.c_str(), buf.size());
247     }
248 
249     if (locked_internally) {
250         unlock();
251     }
252 #endif
253     return 0;
254 }
255 
push_non_locked(const string & buf)256 int FileStack::push_non_locked(const string & buf) {
257     DBG("");
258     static const string nl("\n");
259 #ifndef WIN32
260     lseek(fd, 0, SEEK_END);
261     size_t n = write(fd, buf.c_str(), buf.size());
262     if (buf.back() != nl.back()) {
263         n += write(fd, nl.c_str(), nl.size());
264     }
265     return n;
266 #else
267 	return 0;
268 #endif
269 }
270 
push(const string & buf,size_t & size_after_push)271 int FileStack::push(const string & buf, size_t & size_after_push) {
272     DBG("");
273     bool locked_internally = false;
274     if (! locked) {
275         lock();
276         locked_internally = true;
277     }
278     size_t n = push_non_locked(buf);
279     if (size_after_push != numeric_limits<size_t>::max()) {
280         size_after_push = size();
281     }
282     if (locked_internally) {
283         unlock();
284     }
285     return n;
286 }
287 
push(const string & buf)288 int FileStack::push(const string & buf) {
289     DBG("");
290     size_t size_after_push = numeric_limits<size_t>::max();
291     return push(buf, size_after_push);
292 }
293 
push(int i)294 int FileStack::push(int i) {
295     DBG("");
296     string buf = to_string(i);
297     return push(buf);
298 }
299 
300 
301 
size()302 size_t FileStack::size() {
303     DBG("");
304     size_t n_bytes, i, c = 0;
305     const size_t chunk_size = default_max_line_length;
306     char * raw = new char[chunk_size * sizeof(char)];
307 
308     bool locked_internally = false;
309     if (! locked) {
310         lock();
311         locked_internally = true;
312     }
313 
314 #ifndef WIN32
315     lseek(fd, 0, SEEK_SET);
316     while ((n_bytes = read(fd, raw, chunk_size)) > 0) {
317         for (i=0; i<n_bytes; i++) {
318             if (raw[i] == '\n') {
319                 c++;
320             }
321         }
322     }
323 #endif
324 
325     if (locked_internally) {
326         unlock();
327     }
328 
329     delete [] raw;
330     return c;
331 }
332 
clear()333 int FileStack::clear() {
334     DBG("");
335 
336     bool locked_internally = false;
337     if (! locked) {
338         lock();
339         locked_internally = true;
340     }
341 
342 #ifndef WIN32
343     lseek(fd, 0, SEEK_SET);
344     int stat = ftruncate(fd, 0);
345 #else
346     int stat = 0;
347 #endif
348 
349     if (locked_internally) {
350         unlock();
351     }
352 
353     return stat;
354 }
355 
356 
357 
poll_query(const string & query,const double sleep_s,const size_t max_iter)358 bool FileStack::poll_query(const string & query, const double sleep_s, const size_t max_iter) {
359     DBG("");
360     string buf;
361     const chrono::duration<double> sleep_time(sleep_s);
362     for (size_t i=0; i < max_iter; ++i) {
363         top(buf);
364         if (buf.find(query) != string::npos) {
365             DBG(string("") + " success : poll_iteration=" + to_string(i) + ", query=" + "\"" + query + "\"");
366             return true;
367         } else {
368             DBG(string("") + " ongoing : poll_iteration=" + to_string(i) + ", query=" + "\"" + query + "\"");
369         }
370         if (buf.find("STOP") != string::npos) {
371             throw(runtime_error("STOP on FileStack " + file_name));
372         }
373         this_thread::sleep_for(sleep_time);
374     }
375     throw(runtime_error("Could not discover keyword " + query + " on FileStack " + file_name
376                       + " within " + to_string(double(max_iter) * sleep_s) + " seconds."));
377     return false;  // TODO : finally decide on the semantics
378 };
379 
poll_size(const size_t size,const double sleep_s,const size_t max_iter)380 bool FileStack::poll_size(const size_t size, const double sleep_s, const size_t max_iter) {
381     DBG("");
382     string buf;
383     const chrono::duration<double> sleep_time(sleep_s);
384     for (size_t i=0; i < max_iter; ++i) {
385         if (this->size() == size) {
386             return true;
387         }
388         this_thread::sleep_for(sleep_time);
389     }
390     throw(runtime_error("Could not detect size " + to_string(size) + " of FileStack " + file_name
391                       + " within " + to_string(double(max_iter) * sleep_s) + " seconds."));
392     return false;  // TODO : finally decide on the semantics
393 };
394 
395 
396 
set_max_line_length(int n)397 int FileStack::set_max_line_length(int n) {
398     DBG("");
399     const int minimum_line_length = 8;
400     if (n < minimum_line_length) {
401         n = minimum_line_length;
402     }
403     max_line_length = n;
404     return max_line_length;
405 }
406 
get_max_line_length()407 int FileStack::get_max_line_length() {
408     DBG("");
409     return max_line_length;
410 }
411