1 /*
2 * Copyright (c) 2009-2020, Peter Haag
3 * Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * * Neither the name of the author nor the names of its contributors may be
15 * used to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 *
30 */
31
32 #ifdef HAVE_CONFIG_H
33 #include "config.h"
34 #endif
35
36 #include <signal.h>
37 #include <sys/types.h>
38 #include <sys/time.h>
39 #include <sys/resource.h>
40 #include <sys/wait.h>
41 #include <sys/socket.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <string.h>
45 #include <stdarg.h>
46 #include <errno.h>
47 #include <unistd.h>
48
49 #ifdef HAVE_STDINT_H
50 #include <stdint.h>
51 #endif
52
53 #include "nfstatfile.h"
54 #include "bookkeeper.h"
55
56 #ifdef HAVE_FTS_H
57 # include <fts.h>
58 #else
59 # include "fts_compat.h"
60 #define fts_children fts_children_compat
61 #define fts_close fts_close_compat
62 #define fts_open fts_open_compat
63 #define fts_read fts_read_compat
64 #define fts_set fts_set_compat
65 #endif
66
67 #include "util.h"
68 #include "nfdump.h"
69 #include "nffile.h"
70 #include "expire.h"
71 #include "collector.h"
72 #include "launch.h"
73
74 #define DEVEL 1
75
76 static int done, launch, child_exit;
77
78 static void SignalHandler(int signal);
79
80 static char *cmd_expand(srecord_t *InfoRecord, char *ident, char *datadir, char *process);
81
82 static void cmd_parse(char *buf, char **args);
83
84 static void cmd_execute(char **args);
85
86 static void do_expire(char *datadir);
87
88 #define MAXARGS 256
89 #define MAXCMDLEN 4096
90
SignalHandler(int signal)91 static void SignalHandler(int signal) {
92
93 switch (signal) {
94 case SIGTERM:
95 // in case the process will not terminate, we
96 // kill the process directly after the 2nd TERM signal
97 if ( done > 1 )
98 exit(234);
99 done++;
100 break;
101 case SIGHUP:
102 launch = 1;
103 break;
104 case SIGCHLD:
105 child_exit++;
106 break;
107 }
108
109 } /* End of IntHandler */
110
111 /*
112 * Expand % placeholders in command string
113 * expand the memory needed in the command string and replace placeholders
114 * prevent endless expansion
115 */
cmd_expand(srecord_t * InfoRecord,char * ident,char * datadir,char * process)116 static char *cmd_expand(srecord_t *InfoRecord, char *ident, char *datadir, char *process) {
117 char *q, *s, tmp[16];
118 int i;
119
120 q = strdup(process);
121 if ( !q ) {
122 LogError("strdup() error in %s:%i: %s", __FILE__, __LINE__ , strerror(errno));
123 return NULL;
124 }
125 i = 0;
126
127 while ( q[i] ) {
128 if ( (q[i] == '%') && q[i+1] ) {
129 // replace the %x var
130 switch ( q[i+1] ) {
131 case 'd' :
132 s = datadir;
133 break;
134 case 'f' :
135 s = InfoRecord->fname;
136 break;
137 case 't' :
138 s = InfoRecord->tstring;
139 break;
140 case 'u' :
141 snprintf(tmp, 16, "%lli", (long long)InfoRecord->tstamp);
142 tmp[15] = 0;
143 s = tmp;
144 break;
145 case 'i' :
146 s = ident;
147 break;
148 default:
149 LogError("Unknown format token '%%%c'", q[i+1]);
150 s = NULL;
151 }
152 if ( s ) {
153 q = realloc(q, strlen(q) + strlen(s));
154 if ( !q ) {
155 LogError("realloc() error in %s:%i: %s", __FILE__, __LINE__ , strerror(errno));
156 return NULL;
157 }
158 // sanity check
159 if ( strlen(q) > MAXCMDLEN ) {
160 LogError("command expand error in %s:%i: cmd line too long", __FILE__, __LINE__);
161 return NULL;
162 }
163 memmove(&q[i] + strlen(s), &q[i+2], strlen(&q[i+2]) + 1); // include trailing '0' in memmove
164 memcpy(&q[i], s, strlen(s));
165 }
166 }
167 i++;
168 }
169
170 return q;
171
172 } // End of cmd_expand
173
174 /*
175 * split the command in buf into individual arguments.
176 */
cmd_parse(char * buf,char ** args)177 static void cmd_parse(char *buf, char **args) {
178 int i, argnum;
179
180 i = argnum = 0;
181 while ( (i < MAXCMDLEN) && (buf[i] != 0) ) {
182
183 /*
184 * Strip whitespace. Use nulls, so
185 * that the previous argument is terminated
186 * automatically.
187 */
188 while ( (i < MAXCMDLEN) && ((buf[i] == ' ') || (buf[i] == '\t')))
189 buf[i++] = 0;
190
191 /*
192 * Save the argument.
193 */
194 if ( argnum < MAXARGS )
195 args[argnum++] = &(buf[i]);
196
197 /*
198 * Skip over the argument.
199 */
200 while ( (i < MAXCMDLEN) && ((buf[i] != 0) && (buf[i] != ' ') && (buf[i] != '\t')))
201 i++;
202 }
203
204 if ( argnum < MAXARGS )
205 args[argnum] = NULL;
206
207 if ( (i >= MAXCMDLEN) || (argnum >= MAXARGS) ) {
208 // for safety reason, disable the command
209 args[0] = NULL;
210 LogError("Launcher: Unable to parse command: '%s'", buf);
211 }
212
213 } // End of cmd_parse
214
215 /*
216 * cmd_execute
217 * spawn a child process and execute the program.
218 */
cmd_execute(char ** args)219 static void cmd_execute(char **args) {
220 int pid;
221
222 // Get a child process.
223 LogInfo("Launcher: fork child.");
224 if ((pid = fork()) < 0) {
225 LogError("Can't fork: %s", strerror(errno));
226 return;
227 }
228
229 if (pid == 0) { // we are the child
230 execvp(*args, args);
231 LogError("Can't execvp: %s: %s", args[0], strerror(errno));
232 _exit(1);
233 }
234
235 // we are the parent
236 LogInfo("Launcher: child exec done.");
237 /* empty */
238
239 } // End of cmd_execute
240
do_expire(char * datadir)241 static void do_expire(char *datadir) {
242 bookkeeper_t *books;
243 dirstat_t *dirstat, oldstat;
244 int ret, bookkeeper_stat, do_rescan;
245
246 LogInfo("Run expire on '%s'", datadir);
247
248 do_rescan = 0;
249 ret = ReadStatInfo(datadir, &dirstat, CREATE_AND_LOCK);
250 switch (ret) {
251 case STATFILE_OK:
252 break;
253 case ERR_NOSTATFILE:
254 dirstat->low_water = 95;
255 case FORCE_REBUILD:
256 LogInfo("Force rebuild stat record");
257 do_rescan = 1;
258 break;
259 case ERR_FAIL:
260 LogError("expire failed: can't read stat record");
261 return;
262 /* not reached */
263 break;
264 default:
265 LogError("expire failed: unexpected return code %i reading stat record", ret);
266 return;
267 /* not reached */
268 }
269
270 bookkeeper_stat = AccessBookkeeper(&books, datadir);
271 if ( do_rescan ) {
272 RescanDir(datadir, dirstat);
273 if ( bookkeeper_stat == BOOKKEEPER_OK ) {
274 ClearBooks(books, NULL);
275 // release the books below
276 }
277 }
278
279 if ( bookkeeper_stat == BOOKKEEPER_OK ) {
280 bookkeeper_t tmp_books;
281 ClearBooks(books, &tmp_books);
282 UpdateBookStat(dirstat, &tmp_books);
283 ReleaseBookkeeper(books, DETACH_ONLY);
284 } else {
285 LogError("Error %i: can't access book keeping records", ret);
286 }
287
288 LogInfo("Limits: Filesize %s, Lifetime %s, Watermark: %llu%%\n",
289 dirstat->max_size ? ScaleValue(dirstat->max_size) : "<none>",
290 dirstat->max_lifetime ? ScaleTime(dirstat->max_lifetime) : "<none>",
291 (unsigned long long)dirstat->low_water);
292
293 LogInfo("Current size: %s, Current lifetime: %s, Number of files: %llu",
294 ScaleValue(dirstat->filesize),
295 ScaleTime(dirstat->last - dirstat->first),
296 (unsigned long long)dirstat->numfiles);
297
298 oldstat = *dirstat;
299 if ( dirstat->max_size || dirstat->max_lifetime )
300 ExpireDir(datadir, dirstat, dirstat->max_size, dirstat->max_lifetime, 0);
301 WriteStatInfo(dirstat);
302
303 if ( (oldstat.numfiles - dirstat->numfiles) > 0 ) {
304 LogInfo("expire completed");
305 LogInfo(" expired files: %llu", (unsigned long long)(oldstat.numfiles - dirstat->numfiles));
306 LogInfo(" expired time slot: %s", ScaleTime(dirstat->first - oldstat.first));
307 LogInfo(" expired file size: %s", ScaleValue(oldstat.filesize - dirstat->filesize));
308 LogInfo("New size: %s, New lifetime: %s, Number of files: %llu",
309 ScaleValue(dirstat->filesize),
310 ScaleTime(dirstat->last - dirstat->first),
311 (unsigned long long)dirstat->numfiles);
312 } else {
313 LogInfo("expire completed - nothing to expire.");
314 }
315 ReleaseStatInfo(dirstat);
316
317 } // End of do_expire
318
launcher(void * commbuff,FlowSource_t * FlowSource,char * process,int expire)319 void launcher (void *commbuff, FlowSource_t *FlowSource, char *process, int expire) {
320 FlowSource_t *fs;
321 struct sigaction act;
322 char *args[MAXARGS];
323 int pid, stat;
324 srecord_t *InfoRecord;
325
326 InfoRecord = (srecord_t *)commbuff;
327
328 LogInfo("Launcher: Startup. auto-expire %s", expire ? "enabled" : "off" );
329 done = launch = child_exit = 0;
330
331 // process may be NULL, if we only expire data files
332 if ( process ) {
333 char *cmd = NULL;
334 srecord_t TestRecord;
335 // check for valid command expansion
336 strncpy(TestRecord.fname, "test", MAXPATHLEN-1);
337 TestRecord.fname[MAXPATHLEN-1] = 0;
338 strncpy(TestRecord.tstring, "20190711084500", MAXTIMESTRING-1);
339 TestRecord.tstring[MAXTIMESTRING-1] = 0;
340 TestRecord.tstamp = 1;
341
342 // checkk valid command expansion
343 fs = FlowSource;
344 cmd = cmd_expand(&TestRecord, fs->Ident, fs->datadir, process);
345 if ( cmd == NULL ) {
346 LogError("Launcher: ident: %s, Unable to expand command: '%s'", fs->Ident, process);
347 exit(255);
348 }
349 free(cmd);
350 }
351
352 /* Signal handling */
353 memset((void *)&act,0,sizeof(struct sigaction));
354 act.sa_handler = SignalHandler;
355 sigemptyset(&act.sa_mask);
356 act.sa_flags = 0;
357 sigaction(SIGCHLD, &act, NULL); // child process terminated
358 sigaction(SIGTERM, &act, NULL); // we are done
359 sigaction(SIGINT, &act, NULL); // we are done
360 sigaction(SIGHUP, &act, NULL); // run command
361
362 while ( !done ) {
363 // sleep until we get signaled
364 dbg_printf("Launcher: Sleeping");
365 select(0, NULL, NULL, NULL, NULL);
366 dbg_printf("Launcher: Wakeup");
367 if ( launch ) { // SIGHUP
368 launch = 0;
369
370 if ( process ) {
371 char *cmd = NULL;
372
373 fs = FlowSource;
374 while ( fs ) {
375 // Expand % placeholders
376 cmd = cmd_expand(InfoRecord, fs->Ident, fs->datadir, process);
377 if ( cmd == NULL ) {
378 LogError("Launcher: ident: %s, Unable to expand command: '%s'", fs->Ident, process);
379 continue;
380 }
381 dbg_printf("Launcher: ident: %s run command: '%s'", fs->Ident, cmd);
382
383 // prepare args array
384 cmd_parse(cmd, args);
385 if ( args[0] )
386 cmd_execute(args);
387
388 // do not flood the system with new processes
389 sleep(1);
390 // else cmd_parse already reported the error
391 free(cmd);
392 fs = fs->next;
393 }
394 }
395
396 fs = FlowSource;
397 while ( fs ) {
398 if ( expire )
399 do_expire(fs->datadir);
400 fs = fs->next;
401 }
402 }
403 if ( child_exit ) {
404 LogInfo("launcher child exit %d children.", child_exit);
405 while ( (pid = waitpid (-1, &stat, WNOHANG)) > 0 ) {
406 if ( WIFEXITED(stat) ) {
407 LogInfo("launcher child %i exit status: %i", pid, WEXITSTATUS(stat));
408 }
409 if ( WIFSIGNALED(stat) ) {
410 LogError("launcher child %i died due to signal %i", pid, WTERMSIG(stat));
411 }
412
413 child_exit--;
414 }
415 LogInfo("launcher waiting children done. %d children", child_exit);
416 child_exit = 0;
417 }
418 if ( done ) {
419 LogInfo("Launcher: Terminating.");
420 }
421 }
422
423 waitpid (-1, &stat, 0);
424
425 // we are done
426 LogInfo("Launcher: exit.");
427
428 } // End of launcher
429
430