1 #include <string>
2 #include <iostream>
3 #include <chrono>
4 #include <thread>
5 #include <stdexcept>
6 #include <limits>
7 #include <vector>
8 #include <algorithm>
9
10 #include <cstdio>
11 #include <cstring>
12 #ifndef WIN32
13 #include <fcntl.h>
14 #include <unistd.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #endif
18
19 #include "multiprocessing.h"
20 // #define DEBUG
21 #undef DEBUG
22 #include "filestack.h"
23
24 using namespace std;
25
26
27 const string default_file_name = "default_stack.idx";
28 const int default_max_line_length = 4096;
29
30
FileStack()31 FileStack::FileStack() : FileStack::FileStack(default_file_name) {
32 std::cerr << "FileStack: Using default file name " << default_file_name << std::endl;
33 }
34
FileStack(const string & file_name)35 FileStack::FileStack(const string & file_name) : FileStack::FileStack(file_name, default_max_line_length) {
36 }
37
FileStack(const string & file_name,int maximum_line_length)38 FileStack::FileStack(const string & file_name, int maximum_line_length) {
39 #ifndef WIN32
40 DBG("");
41 fd = open(file_name.c_str(), O_RDWR | O_CREAT, 00664);
42 if (fd == -1) {
43 throw(std::runtime_error("could not open file " + file_name));
44 }
45 this->locked = false;
46 this->file_name = file_name;
47 set_max_line_length(maximum_line_length);
48 #endif
49 }
50
~FileStack()51 FileStack::~FileStack() {
52 DBG("");
53 #ifndef WIN32
54 close(fd);
55 #endif
56 }
57
58
59
lock()60 int FileStack::lock() {
61 #ifndef WIN32
62 DBG("");
63 int fcntl_status = -1;
64 if (fd >= 0) {
65 memset(&lck, 0, sizeof(lck));
66 lck.l_type = F_WRLCK; // exclusive lock
67 lck.l_whence = SEEK_SET;
68 lck.l_start = 0;
69 lck.l_len = 0;
70 fcntl_status = fcntl(fd, F_SETLKW, &lck);
71 if (fcntl_status == -1) {
72 throw(std::runtime_error("could not put lock on file " + file_name));
73 } else {
74 locked = true;
75 }
76 } else {
77 throw(std::runtime_error("could not put lock on non-open file " + file_name));
78 }
79 return fcntl_status;
80 #else
81 return 0;
82 #endif
83 }
84
unlock()85 int FileStack::unlock() {
86 #ifndef WIN32
87 DBG("");
88 int fcntl_status = -1;
89 if (fd >= 0) {
90 lck.l_type = F_UNLCK;
91 fcntl_status = fcntl(fd, F_SETLKW, &lck);
92 if (fcntl_status == -1) {
93 throw(std::runtime_error("could not unlock file " + file_name));
94 } else {
95 locked = false;
96 }
97 }
98 return fcntl_status;
99 #else
100 return 0;
101 #endif
102 }
103
pop_non_locked(string & buf,const bool keep_flag,size_t & size_after_pop)104 int FileStack::pop_non_locked(string & buf, const bool keep_flag, size_t & size_after_pop) {
105 #ifndef WIN32
106 DBG("");
107 buf.clear();
108 int stat = 0;
109 const off_t size = lseek(fd, 0, SEEK_END);
110 if (size > 0) {
111 off_t jmp = size - max_line_length;
112 if (jmp < 0) jmp = 0;
113 lseek(fd, jmp, SEEK_SET);
114
115 char * raw = new char[max_line_length * sizeof(char)];
116 const ssize_t n_read = read(fd, raw, max_line_length);
117 string chunk;
118 chunk.assign(raw, n_read);
119 delete [] raw;
120
121 size_t begin = 0, end = 0;
122 const char key = '\n';
123 size_t found = chunk.rfind(key);
124 if (found != string::npos) {
125 end = found;
126 }
127 if (end > 0) {
128 found = chunk.rfind(key, end-1);
129 if (found != string::npos) {
130 begin = found + 1;
131 }
132 }
133 const size_t line_size = end - begin + 1;
134 if (line_size > 0) {
135 buf.assign(chunk, begin, line_size - 1);
136 if (! keep_flag) {
137 stat = ftruncate(fd, size - line_size);
138 }
139 }
140 }
141 if (size_after_pop != numeric_limits<size_t>::max()) {
142 size_after_pop = this->size();
143 }
144 if (stat == -1) {
145 return stat;
146 } else {
147 DBG(buf);
148 return buf.size();
149 }
150 #else
151 return 0;
152 #endif
153 }
154
pop_non_locked(string & buf)155 int FileStack::pop_non_locked(string & buf) {
156 DBG("");
157 size_t size_after_pop = numeric_limits<size_t>::max();
158 return pop_non_locked(buf, false, size_after_pop);
159 }
160
pop(string & buf,const bool keep_flag,size_t & size_after_pop)161 int FileStack::pop(string & buf, const bool keep_flag, size_t & size_after_pop) {
162 #ifndef WIN32
163 DBG("");
164 bool locked_internally = false;
165 if (! locked) {
166 lock();
167 locked_internally = true;
168 }
169 int val = pop_non_locked(buf, keep_flag, size_after_pop);
170 if (locked_internally) {
171 unlock();
172 }
173 return val;
174 #else
175 return 0;
176 #endif
177 }
178
pop(string & buf)179 int FileStack::pop(string & buf) {
180 DBG("");
181 size_t size_after_pop = numeric_limits<size_t>::max();
182 return pop(buf, false, size_after_pop);
183 }
184
top(string & buf)185 int FileStack::top(string & buf) {
186 DBG("");
187 size_t size_after_pop = numeric_limits<size_t>::max();
188 return pop(buf, true, size_after_pop);
189 }
190
pop(string & buf,size_t & size_after_pop)191 int FileStack::pop(string & buf, size_t & size_after_pop) {
192 DBG("");
193 return pop(buf, false, size_after_pop);
194 }
195
pop(int & i)196 int FileStack::pop(int & i) {
197 DBG("");
198 string buf;
199 size_t size_after_pop = numeric_limits<size_t>::max();
200 const int get_status = pop(buf, false, size_after_pop);
201 if (get_status > 0) {
202 return i = stoi(buf);
203 } else {
204 return -1;
205 }
206 }
207
top(int & i)208 int FileStack::top(int & i) {
209 DBG("");
210 string buf;
211 size_t size_after_pop = numeric_limits<size_t>::max();
212 const int get_status = pop(buf, true, size_after_pop);
213 if (get_status > 0) {
214 return i = stoi(buf);
215 } else {
216 return -1;
217 }
218 }
219
remove(const string & line)220 int FileStack::remove(const string & line) {
221 DBG("");
222 #ifndef WIN32
223 bool locked_internally = false;
224 if (! locked) {
225 lock();
226 locked_internally = true;
227 }
228 const off_t size = lseek(fd, 0, SEEK_END);
229 lseek(fd, 0, SEEK_SET);
230
231 char * raw = new char[size * sizeof(char)];
232 const ssize_t n_read = read(fd, raw, size);
233 string buf;
234 buf.assign(raw, n_read);
235 delete [] raw;
236
237 vector<string> tokens = split(buf, '\n');
238 buf.clear();
239
240 tokens.erase(std::remove(tokens.begin(), tokens.end(), line), tokens.end());
241
242 lseek(fd, 0, SEEK_SET);
243 int stat = ftruncate(fd, 0);
244 for (auto it = tokens.begin(); it != tokens.end(); ++it) {
245 buf = *it + '\n';
246 size_t n = write(fd, buf.c_str(), buf.size());
247 }
248
249 if (locked_internally) {
250 unlock();
251 }
252 #endif
253 return 0;
254 }
255
push_non_locked(const string & buf)256 int FileStack::push_non_locked(const string & buf) {
257 DBG("");
258 static const string nl("\n");
259 #ifndef WIN32
260 lseek(fd, 0, SEEK_END);
261 size_t n = write(fd, buf.c_str(), buf.size());
262 if (buf.back() != nl.back()) {
263 n += write(fd, nl.c_str(), nl.size());
264 }
265 return n;
266 #else
267 return 0;
268 #endif
269 }
270
push(const string & buf,size_t & size_after_push)271 int FileStack::push(const string & buf, size_t & size_after_push) {
272 DBG("");
273 bool locked_internally = false;
274 if (! locked) {
275 lock();
276 locked_internally = true;
277 }
278 size_t n = push_non_locked(buf);
279 if (size_after_push != numeric_limits<size_t>::max()) {
280 size_after_push = size();
281 }
282 if (locked_internally) {
283 unlock();
284 }
285 return n;
286 }
287
push(const string & buf)288 int FileStack::push(const string & buf) {
289 DBG("");
290 size_t size_after_push = numeric_limits<size_t>::max();
291 return push(buf, size_after_push);
292 }
293
push(int i)294 int FileStack::push(int i) {
295 DBG("");
296 string buf = to_string(i);
297 return push(buf);
298 }
299
300
301
size()302 size_t FileStack::size() {
303 DBG("");
304 size_t n_bytes, i, c = 0;
305 const size_t chunk_size = default_max_line_length;
306 char * raw = new char[chunk_size * sizeof(char)];
307
308 bool locked_internally = false;
309 if (! locked) {
310 lock();
311 locked_internally = true;
312 }
313
314 #ifndef WIN32
315 lseek(fd, 0, SEEK_SET);
316 while ((n_bytes = read(fd, raw, chunk_size)) > 0) {
317 for (i=0; i<n_bytes; i++) {
318 if (raw[i] == '\n') {
319 c++;
320 }
321 }
322 }
323 #endif
324
325 if (locked_internally) {
326 unlock();
327 }
328
329 delete [] raw;
330 return c;
331 }
332
clear()333 int FileStack::clear() {
334 DBG("");
335
336 bool locked_internally = false;
337 if (! locked) {
338 lock();
339 locked_internally = true;
340 }
341
342 #ifndef WIN32
343 lseek(fd, 0, SEEK_SET);
344 int stat = ftruncate(fd, 0);
345 #else
346 int stat = 0;
347 #endif
348
349 if (locked_internally) {
350 unlock();
351 }
352
353 return stat;
354 }
355
356
357
poll_query(const string & query,const double sleep_s,const size_t max_iter)358 bool FileStack::poll_query(const string & query, const double sleep_s, const size_t max_iter) {
359 DBG("");
360 string buf;
361 const chrono::duration<double> sleep_time(sleep_s);
362 for (size_t i=0; i < max_iter; ++i) {
363 top(buf);
364 if (buf.find(query) != string::npos) {
365 DBG(string("") + " success : poll_iteration=" + to_string(i) + ", query=" + "\"" + query + "\"");
366 return true;
367 } else {
368 DBG(string("") + " ongoing : poll_iteration=" + to_string(i) + ", query=" + "\"" + query + "\"");
369 }
370 if (buf.find("STOP") != string::npos) {
371 throw(runtime_error("STOP on FileStack " + file_name));
372 }
373 this_thread::sleep_for(sleep_time);
374 }
375 throw(runtime_error("Could not discover keyword " + query + " on FileStack " + file_name
376 + " within " + to_string(double(max_iter) * sleep_s) + " seconds."));
377 return false; // TODO : finally decide on the semantics
378 };
379
poll_size(const size_t size,const double sleep_s,const size_t max_iter)380 bool FileStack::poll_size(const size_t size, const double sleep_s, const size_t max_iter) {
381 DBG("");
382 string buf;
383 const chrono::duration<double> sleep_time(sleep_s);
384 for (size_t i=0; i < max_iter; ++i) {
385 if (this->size() == size) {
386 return true;
387 }
388 this_thread::sleep_for(sleep_time);
389 }
390 throw(runtime_error("Could not detect size " + to_string(size) + " of FileStack " + file_name
391 + " within " + to_string(double(max_iter) * sleep_s) + " seconds."));
392 return false; // TODO : finally decide on the semantics
393 };
394
395
396
set_max_line_length(int n)397 int FileStack::set_max_line_length(int n) {
398 DBG("");
399 const int minimum_line_length = 8;
400 if (n < minimum_line_length) {
401 n = minimum_line_length;
402 }
403 max_line_length = n;
404 return max_line_length;
405 }
406
get_max_line_length()407 int FileStack::get_max_line_length() {
408 DBG("");
409 return max_line_length;
410 }
411