1 /*******************************************************************************
2 # #
3 # MJPG-streamer allows to stream JPG frames from an input-plugin #
4 # to several output plugins #
5 # #
6 # Copyright (C) 2007 Tom Stöveken #
7 # #
8 # This program is free software; you can redistribute it and/or modify #
9 # it under the terms of the GNU General Public License as published by #
10 # the Free Software Foundation; version 2 of the License. #
11 # #
12 # This program is distributed in the hope that it will be useful, #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15 # GNU General Public License for more details. #
16 # #
17 # You should have received a copy of the GNU General Public License #
18 # along with this program; if not, write to the Free Software #
19 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA #
20 # #
21 *******************************************************************************/
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25 #include <string.h>
26 #include <getopt.h>
27 #include <pthread.h>
28 #include <syslog.h>
29 #include <sys/types.h>
30 #include <sys/inotify.h>
31 #include <dirent.h>
32 #include <sys/stat.h>
33 #include <fcntl.h>
34
35 #include "../../mjpg_streamer.h"
36 #include "../../utils.h"
37
38 #define INPUT_PLUGIN_NAME "FILE input plugin"
39
40 typedef enum _read_mode {
41 NewFilesOnly,
42 ExistingFiles
43 } read_mode;
44
45 /* private functions and variables to this plugin */
46 static pthread_t worker;
47 static globals *pglobal;
48
49 void *worker_thread(void *);
50 void worker_cleanup(void *);
51 void help(void);
52
53 static double delay = 1.0;
54 static char *folder = NULL;
55 static char *filename = NULL;
56 static int rm = 0;
57 static int plugin_number;
58 static read_mode mode = NewFilesOnly;
59
60 /* global variables for this plugin */
61 static int fd, rc, wd, size;
62 static struct inotify_event *ev;
63
64 /*** plugin interface functions ***/
input_init(input_parameter * param,int id)65 int input_init(input_parameter *param, int id)
66 {
67 int i;
68 plugin_number = id;
69
70 param->argv[0] = INPUT_PLUGIN_NAME;
71
72 /* show all parameters for DBG purposes */
73 for(i = 0; i < param->argc; i++) {
74 DBG("argv[%d]=%s\n", i, param->argv[i]);
75 }
76
77 reset_getopt();
78 while(1) {
79 int option_index = 0, c = 0;
80 static struct option long_options[] = {
81 {"h", no_argument, 0, 0
82 },
83 {"help", no_argument, 0, 0},
84 {"d", required_argument, 0, 0},
85 {"delay", required_argument, 0, 0},
86 {"f", required_argument, 0, 0},
87 {"folder", required_argument, 0, 0},
88 {"r", no_argument, 0, 0},
89 {"remove", no_argument, 0, 0},
90 {"n", required_argument, 0, 0},
91 {"name", required_argument, 0, 0},
92 {"e", no_argument, 0, 0},
93 {"existing", no_argument, 0, 0},
94 {0, 0, 0, 0}
95 };
96
97 c = getopt_long_only(param->argc, param->argv, "", long_options, &option_index);
98
99 /* no more options to parse */
100 if(c == -1) break;
101
102 /* unrecognized option */
103 if(c == '?') {
104 help();
105 return 1;
106 }
107
108 switch(option_index) {
109 /* h, help */
110 case 0:
111 case 1:
112 DBG("case 0,1\n");
113 help();
114 return 1;
115 break;
116
117 /* d, delay */
118 case 2:
119 case 3:
120 DBG("case 2,3\n");
121 delay = atof(optarg);
122 break;
123
124 /* f, folder */
125 case 4:
126 case 5:
127 DBG("case 4,5\n");
128 folder = malloc(strlen(optarg) + 2);
129 strcpy(folder, optarg);
130 if(optarg[strlen(optarg)-1] != '/')
131 strcat(folder, "/");
132 break;
133
134 /* r, remove */
135 case 6:
136 case 7:
137 DBG("case 6,7\n");
138 rm = 1;
139 break;
140
141 /* n, name */
142 case 8:
143 case 9:
144 DBG("case 8,9\n");
145 filename = malloc(strlen(optarg) + 1);
146 strcpy(filename, optarg);
147 break;
148 /* e, existing */
149 case 10:
150 case 11:
151 DBG("case 10,11\n");
152 mode = ExistingFiles;
153 break;
154 default:
155 DBG("default case\n");
156 help();
157 return 1;
158 }
159 }
160
161 pglobal = param->global;
162
163 /* check for required parameters */
164 if(folder == NULL) {
165 IPRINT("ERROR: no folder specified\n");
166 return 1;
167 }
168
169 IPRINT("folder to watch...: %s\n", folder);
170 IPRINT("forced delay......: %.4f\n", delay);
171 IPRINT("delete file.......: %s\n", (rm) ? "yes, delete" : "no, do not delete");
172 IPRINT("filename must be..: %s\n", (filename == NULL) ? "-no filter for certain filename set-" : filename);
173
174 param->global->in[id].name = malloc((strlen(INPUT_PLUGIN_NAME) + 1) * sizeof(char));
175 sprintf(param->global->in[id].name, INPUT_PLUGIN_NAME);
176
177 return 0;
178 }
179
input_stop(int id)180 int input_stop(int id)
181 {
182 DBG("will cancel input thread\n");
183 pthread_cancel(worker);
184 return 0;
185 }
186
input_run(int id)187 int input_run(int id)
188 {
189 pglobal->in[id].buf = NULL;
190
191 if (mode == NewFilesOnly) {
192 rc = fd = inotify_init();
193 if(rc == -1) {
194 perror("could not initilialize inotify");
195 return 1;
196 }
197
198 rc = wd = inotify_add_watch(fd, folder, IN_CLOSE_WRITE | IN_MOVED_TO | IN_ONLYDIR);
199 if(rc == -1) {
200 perror("could not add watch");
201 return 1;
202 }
203
204 size = sizeof(struct inotify_event) + (1 << 16);
205 ev = malloc(size);
206 if(ev == NULL) {
207 perror("not enough memory");
208 return 1;
209 }
210 }
211
212 if(pthread_create(&worker, 0, worker_thread, NULL) != 0) {
213 free(pglobal->in[id].buf);
214 fprintf(stderr, "could not start worker thread\n");
215 exit(EXIT_FAILURE);
216 }
217
218 pthread_detach(worker);
219
220 return 0;
221 }
222
223 /*** private functions for this plugin below ***/
help(void)224 void help(void)
225 {
226 fprintf(stderr, " ---------------------------------------------------------------\n" \
227 " Help for input plugin..: "INPUT_PLUGIN_NAME"\n" \
228 " ---------------------------------------------------------------\n" \
229 " The following parameters can be passed to this plugin:\n\n" \
230 " [-d | --delay ]........: delay (in seconds) to pause between frames\n" \
231 " [-f | --folder ].......: folder to watch for new JPEG files\n" \
232 " [-r | --remove ].......: remove/delete JPEG file after reading\n" \
233 " [-n | --name ].........: ignore changes unless filename matches\n" \
234 " [-e | --existing ].....: serve the existing *.jpg files from the specified directory\n" \
235 " ---------------------------------------------------------------\n");
236 }
237
238 /* the single writer thread */
worker_thread(void * arg)239 void *worker_thread(void *arg)
240 {
241 char buffer[1<<16];
242 int file;
243 size_t filesize = 0;
244 struct stat stats;
245 struct dirent **fileList;
246 int fileCount = 0;
247 int currentFileNumber = 0;
248 char hasJpgFile = 0;
249 struct timeval timestamp;
250
251 if (mode == ExistingFiles) {
252 fileCount = scandir(folder, &fileList, 0, alphasort);
253 if (fileCount < 0) {
254 perror("error during scandir\n");
255 return NULL;
256 }
257 }
258
259 /* set cleanup handler to cleanup allocated resources */
260 pthread_cleanup_push(worker_cleanup, NULL);
261
262 while(!pglobal->stop) {
263 if (mode == NewFilesOnly) {
264 /* wait for new frame, read will block until something happens */
265 rc = read(fd, ev, size);
266 if(rc == -1) {
267 perror("reading inotify events failed\n");
268 break;
269 }
270
271 /* sanity check */
272 if(wd != ev->wd) {
273 fprintf(stderr, "This event is not for the watched directory (%d != %d)\n", wd, ev->wd);
274 continue;
275 }
276
277 if(ev->mask & (IN_IGNORED | IN_Q_OVERFLOW | IN_UNMOUNT)) {
278 fprintf(stderr, "event mask suggests to stop\n");
279 break;
280 }
281
282 /* prepare filename */
283 snprintf(buffer, sizeof(buffer), "%s%s", folder, ev->name);
284
285 /* check if the filename matches specified parameter (if given) */
286 if((filename != NULL) && (strcmp(filename, ev->name) != 0)) {
287 DBG("ignoring this change (specified filename does not match)\n");
288 continue;
289 }
290 DBG("new file detected: %s\n", buffer);
291 } else {
292 if ((strstr(fileList[currentFileNumber]->d_name, ".jpg") != NULL) ||
293 (strstr(fileList[currentFileNumber]->d_name, ".JPG") != NULL)) {
294 hasJpgFile = 1;
295 DBG("serving file: %s\n", fileList[currentFileNumber]->d_name);
296 snprintf(buffer, sizeof(buffer), "%s%s", folder, fileList[currentFileNumber]->d_name);
297 currentFileNumber++;
298 if (currentFileNumber == fileCount)
299 currentFileNumber = 0;
300 } else {
301 currentFileNumber++;
302 if (currentFileNumber == fileCount) {
303 if(hasJpgFile == 0) {
304 perror("No files with jpg/JPG extension in the folder\n");
305 goto thread_quit;
306 } else {
307 // There are some jpeg files, the last one just happens not to be one
308 currentFileNumber = 0;
309 }
310 }
311 continue;
312 }
313 }
314
315 /* open file for reading */
316 rc = file = open(buffer, O_RDONLY);
317 if(rc == -1) {
318 perror("could not open file for reading");
319 break;
320 }
321
322 /* approximate size of file */
323 rc = fstat(file, &stats);
324 if(rc == -1) {
325 perror("could not read statistics of file");
326 close(file);
327 break;
328 }
329
330 filesize = stats.st_size;
331
332 /* copy frame from file to global buffer */
333 pthread_mutex_lock(&pglobal->in[plugin_number].db);
334
335 /* allocate memory for frame */
336 if(pglobal->in[plugin_number].buf != NULL)
337 free(pglobal->in[plugin_number].buf);
338
339 pglobal->in[plugin_number].buf = malloc(filesize + (1 << 16));
340
341 if(pglobal->in[plugin_number].buf == NULL) {
342 fprintf(stderr, "could not allocate memory\n");
343 break;
344 }
345
346 if((pglobal->in[plugin_number].size = read(file, pglobal->in[plugin_number].buf, filesize)) == -1) {
347 perror("could not read from file");
348 free(pglobal->in[plugin_number].buf); pglobal->in[plugin_number].buf = NULL; pglobal->in[plugin_number].size = 0;
349 pthread_mutex_unlock(&pglobal->in[plugin_number].db);
350 close(file);
351 break;
352 }
353
354 gettimeofday(×tamp, NULL);
355 pglobal->in[plugin_number].timestamp = timestamp;
356 DBG("new frame copied (size: %d)\n", pglobal->in[plugin_number].size);
357 /* signal fresh_frame */
358 pthread_cond_broadcast(&pglobal->in[plugin_number].db_update);
359 pthread_mutex_unlock(&pglobal->in[plugin_number].db);
360
361 close(file);
362
363 /* delete file if necessary */
364 if(rm) {
365 rc = unlink(buffer);
366 if(rc == -1) {
367 perror("could not remove/delete file");
368 }
369 }
370
371 if(delay != 0)
372 usleep(1000 * 1000 * delay);
373 }
374
375 thread_quit:
376 while (fileCount--) {
377 free(fileList[fileCount]);
378 }
379 free(fileList);
380
381 DBG("leaving input thread, calling cleanup function now\n");
382 /* call cleanup handler, signal with the parameter */
383 pthread_cleanup_pop(1);
384
385 return NULL;
386 }
387
worker_cleanup(void * arg)388 void worker_cleanup(void *arg)
389 {
390 static unsigned char first_run = 1;
391
392 if(!first_run) {
393 DBG("already cleaned up resources\n");
394 return;
395 }
396
397 first_run = 0;
398 DBG("cleaning up resources allocated by input thread\n");
399
400 if(pglobal->in[plugin_number].buf != NULL) free(pglobal->in[plugin_number].buf);
401
402 free(ev);
403
404 if (mode == NewFilesOnly) {
405 rc = inotify_rm_watch(fd, wd);
406 if(rc == -1) {
407 perror("could not close watch descriptor");
408 }
409
410 rc = close(fd);
411 if(rc == -1) {
412 perror("could not close filedescriptor");
413 }
414 }
415 }
416
417
418
419
420
421