1 // ------------------------------------------------------------------------
2 // audioio-forked-streams.cpp: Helper class providing routines for
3 //                             forking for piped input/output.
4 // Copyright (C) 2000-2004,2006,2008,2011 Kai Vehmanen
5 //
6 // Attributes:
7 //     eca-style-version: 3 (see Ecasound Programmer's Guide)
8 //
9 // This program is free software; you can redistribute it and/or modify
10 // it under the terms of the GNU General Public License as published by
11 // the Free Software Foundation; either version 2 of the License, or
12 // (at your option) any later version.
13 //
14 // This program is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
22 // ------------------------------------------------------------------------
23 
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27 
28 #include <cstdlib>
29 #include <vector>
30 #include <string>
31 #include <cstring>
32 #include <iostream>
33 #include <cstring>
34 #include <cstdio>
35 
36 #include <sys/stat.h>
37 #include <sys/types.h>
38 #include <sys/wait.h>
39 #include <fcntl.h>
40 #include <signal.h>
41 #include <unistd.h>
42 #include <errno.h>
43 
44 #include <kvu_dbc.h>
45 #include <kvu_numtostr.h>
46 #include <kvu_utils.h>
47 
48 #include "eca-logger.h"
49 #include "audioio-forked-stream.h"
50 
51 using namespace std;
52 
53 /**
54  * Maximum number of arguments passed to exec()
55  */
56 const static int afs_max_exec_args = 1024;
57 
58 /**
59  * Runs exec() with the given parameters.
60  * @return exec() return value
61  */
afs_run_exec(const string & command,const string & filename)62 static int afs_run_exec(const string& command, const string& filename)
63 {
64   vector<string> temp = kvu_string_to_tokens_quoted(command);
65   if (static_cast<int>(temp.size()) > afs_max_exec_args) {
66     temp.resize(afs_max_exec_args);
67     ECA_LOG_MSG(ECA_LOGGER::info, "WARNING: too many arguments for external application, truncating.");
68   }
69   const char* args[afs_max_exec_args];
70   vector<string>::size_type p = 0;
71   while(p < temp.size()) {
72     if (temp[p].find("%f") != string::npos) {
73       temp[p].replace(temp[p].find("%f"), 2, filename);
74       args[p] = temp[p].c_str();
75     }
76     else
77       args[p] = temp[p].c_str();
78     ++p;
79   }
80   args[p] = 0;
81   return execvp(temp[0].c_str(), const_cast<char**>(args));
82 }
83 
afs_fd_set_cloexec(int fd)84 static int afs_fd_set_cloexec(int fd)
85 {
86   int flags;
87   flags = fcntl(fd, F_GETFD);
88   if (flags >= 0) {
89     flags |= FD_CLOEXEC;
90     if (fcntl(fd, F_SETFD, flags) >= 0)
91       return 0;
92   }
93   ECA_LOG_MSG(ECA_LOGGER::info, "unable to set FD_CLOEXEC: " +
94 	      std::string(strerror(errno)));
95   return -1;
96 }
97 
~AUDIO_IO_FORKED_STREAM(void)98 AUDIO_IO_FORKED_STREAM::~AUDIO_IO_FORKED_STREAM(void)
99 {
100   if (pid_of_child_rep > 0)
101     clean_child(true);
102 }
103 
stop_io(void)104 void AUDIO_IO_FORKED_STREAM::stop_io(void)
105 {
106   ECA_LOG_MSG(ECA_LOGGER::user_objects, "stop_io()");
107   clean_child(false);
108 }
109 
110 /**
111  * If found, replaces the string '%f' with 'filename'. This is
112  * the file used by the forked child for input/output.
113  */
set_fork_file_name(const string & filename)114 void AUDIO_IO_FORKED_STREAM::set_fork_file_name(const string& filename)
115 {
116   object_rep = filename;
117   /* do not yet replace %f yet as it would make it more
118      difficult to tokenize the exec string */
119 }
120 
121 /**
122  * If found, replaces the string '%F' with a path name to a
123  * temporary named pipe. This pipe will be used for communicating
124  * with the forked child instead of standard input and output pipes.
125  */
set_fork_pipe_name(void)126 void AUDIO_IO_FORKED_STREAM::set_fork_pipe_name(void)
127 {
128   if (command_rep.find("%F") != string::npos) {
129     use_named_pipe_rep = true;
130     init_temp_directory();
131     if (tempfile_dir_rep.is_valid() == true) {
132       tmpfile_repp = tempfile_dir_rep.create_filename("fork-pipe", ".raw");
133       ::mkfifo(tmpfile_repp.c_str(), 0755);
134       command_rep.replace(command_rep.find("%F"), 2, tmpfile_repp);
135       tmp_file_created_rep = true;
136     }
137     else
138       tmp_file_created_rep = false;
139   }
140   else
141     use_named_pipe_rep = false;
142 }
143 
init_temp_directory(void)144 void AUDIO_IO_FORKED_STREAM::init_temp_directory(void)
145 {
146   string tmpdir ("ecasound-");
147   char* tmp_p = getenv("USER");
148   if (tmp_p != NULL) {
149     tmpdir += string(tmp_p);
150     tempfile_dir_rep.reserve_directory(tmpdir);
151   }
152   if (tempfile_dir_rep.is_valid() != true) {
153     ECA_LOG_MSG(ECA_LOGGER::info, "WARNING: Unable to create temporary directory \"" + tmpdir + "\".");
154   }
155 }
156 
157 /**
158  * If found, replaces the string '%c' with value of parameter
159  * 'channels'.
160  */
set_fork_channels(int channels)161 void AUDIO_IO_FORKED_STREAM::set_fork_channels(int channels)
162 {
163   if (command_rep.find("%c") != string::npos) {
164     command_rep.replace(command_rep.find("%c"), 2, kvu_numtostr(channels));
165   }
166 }
167 
168 /**
169  * If found, replaces the string '%s' with value of parameter
170  * 'sample_rate', and '%S' with 'sample_rate/1000' (kHz).
171  */
set_fork_sample_rate(long int sample_rate)172 void AUDIO_IO_FORKED_STREAM::set_fork_sample_rate(long int sample_rate)
173 {
174   if (command_rep.find("%s") != string::npos) {
175     command_rep.replace(command_rep.find("%s"), 2, kvu_numtostr(sample_rate));
176   }
177   if (command_rep.find("%S") != string::npos) {
178     command_rep.replace(command_rep.find("%S"), 2, kvu_numtostr(sample_rate/1000.0f));
179   }
180 }
181 
182 /**
183  * If found, replaces the string '%b' with value of parameter
184  * 'bits'.
185  */
set_fork_bits(int bits)186 void AUDIO_IO_FORKED_STREAM::set_fork_bits(int bits)
187 {
188   if (command_rep.find("%b") != string::npos) {
189     command_rep.replace(command_rep.find("%b"), 2, kvu_numtostr(bits));
190   }
191 }
192 
fork_child_for_read(void)193 void AUDIO_IO_FORKED_STREAM::fork_child_for_read(void)
194 {
195   ECA_LOG_MSG(ECA_LOGGER::user_objects, "Fork child-for-read: '" + fork_command() + "'");
196 
197   init_state_before_fork();
198 
199   if (use_named_pipe_rep == true) {
200     if (tmp_file_created_rep == true) {
201       fork_child_for_fifo_read();
202     }
203     else {
204       last_fork_rep = false;
205     }
206   }
207   else {
208     int fpipes[2];
209     if (pipe(fpipes) == 0) {
210       sigkill_sent_rep = false;
211       pid_of_child_rep = fork();
212       if (pid_of_child_rep == 0) {
213 	// ---
214 	// child
215 	// ---
216 
217 	sigset_t newset;
218 	sigemptyset(&newset);
219 
220 	sigaddset(&newset, SIGTERM);
221 	sigaddset(&newset, SIGPIPE);
222 
223 #if defined(HAVE_PTHREAD_SIGMASK)
224 	pthread_sigmask(SIG_UNBLOCK, &newset, NULL);
225 #elif defined(HAVE_SIGPROCMASK)
226 	sigprocmask(SIG_UNBLOCK, &newset, NULL);
227 #endif
228 
229 	::close(1);
230 	dup2(fpipes[1], 1);
231 	::close(fpipes[0]);
232 	::close(fpipes[1]);
233 	freopen("/dev/null", "w", stderr);
234 	int res = afs_run_exec(command_rep, object_rep);
235 	::close(1);
236 	exit(res);
237 	cerr << "You shouldn't see this!\n";
238       }
239       else if (pid_of_child_rep > 0) {
240 	// ---
241 	// parent
242 	// ---
243 
244 	pid_of_parent_rep = ::getpid();
245 	::close(fpipes[1]);
246 	fd_rep = fpipes[0];
247 	afs_fd_set_cloexec(fd_rep);
248 	if (wait_for_child() == true)
249 	  last_fork_rep = true;
250 	else
251 	  last_fork_rep = false;
252       }
253     }
254   }
255 }
256 
257 /**
258  * Initializes state that needs to be reset/refresh
259  * between every new fork of a child object.
260  */
init_state_before_fork(void)261 void AUDIO_IO_FORKED_STREAM::init_state_before_fork(void)
262 {
263   last_fork_rep = false;
264   fd_rep = 0;
265 
266   if (do_supports_seeking() != true)
267     do_set_position_in_samples(0);
268 }
269 
fork_child_for_fifo_read(void)270 void AUDIO_IO_FORKED_STREAM::fork_child_for_fifo_read(void)
271 {
272   ECA_LOG_MSG(ECA_LOGGER::user_objects, "Fork child-for-fifo-read: '" + fork_command() + "'");
273 
274   init_state_before_fork();
275 
276   sigkill_sent_rep = false;
277   pid_of_child_rep = fork();
278   if (pid_of_child_rep == 0) {
279     // ---
280     // child
281     // ---
282 
283     sigset_t newset;
284     sigemptyset(&newset);
285     sigaddset(&newset, SIGTERM);
286     sigaddset(&newset, SIGPIPE);
287 
288 #if defined(HAVE_PTHREAD_SIGMASK)
289     pthread_sigmask(SIG_UNBLOCK, &newset, NULL);
290 #elif defined(HAVE_SIGPROCMASK)
291     sigprocmask(SIG_UNBLOCK, &newset, NULL);
292 #endif
293 
294     freopen("/dev/null", "w", stderr);
295     int res = afs_run_exec(command_rep, object_rep);
296     if (res < 0) {
297       /**
298        * If execvp failed, make sure that the other end of
299        * the pipe doesn't block forever.
300        */
301       cerr << "execvp() failed!\n";
302       int fd = open(tmpfile_repp.c_str(), O_WRONLY);
303       close(fd);
304     }
305 
306     exit(res);
307     cerr << "You shouldn't see this!\n";
308   }
309   else if (pid_of_child_rep > 0) {
310     // ---
311     // parent
312     // ---
313 
314     pid_of_parent_rep = ::getpid();
315     fd_rep = 0;
316     if (wait_for_child() == true)
317       fd_rep = ::open(tmpfile_repp.c_str(), O_RDONLY);
318     if (fd_rep > 0) {
319       last_fork_rep = true;
320       afs_fd_set_cloexec(fd_rep);
321     }
322   }
323 }
324 
fork_child_for_write(void)325 void AUDIO_IO_FORKED_STREAM::fork_child_for_write(void)
326 {
327   ECA_LOG_MSG(ECA_LOGGER::user_objects, "Fork child-for-write: '" + fork_command() + "'");
328 
329   init_state_before_fork();
330 
331   int fpipes[2];
332   if (pipe(fpipes) == 0) {
333     sigkill_sent_rep = false;
334     pid_of_child_rep = fork();
335     if (pid_of_child_rep == 0) {
336       // ---
337       // child
338       // ---
339       sigset_t newset;
340       sigaddset(&newset, SIGTERM);
341       sigaddset(&newset, SIGPIPE);
342 
343 #if defined(HAVE_PTHREAD_SIGMASK)
344       pthread_sigmask(SIG_UNBLOCK, &newset, NULL);
345 #elif defined(HAVE_SIGPROCMASK)
346       sigprocmask(SIG_UNBLOCK, &newset, NULL);
347 #endif
348 
349       ::close(0);
350       ::dup2(fpipes[0],0);
351       ::close(fpipes[0]);
352       ::close(fpipes[1]);
353       freopen("/dev/null", "w", stderr);
354       exit(afs_run_exec(command_rep, object_rep));
355       cerr << "You shouln't see this!\n";
356     }
357     else if (pid_of_child_rep > 0) {
358       // ---
359       // parent
360       // ---
361       pid_of_parent_rep = ::getpid();
362       ::close(fpipes[0]);
363       fd_rep = fpipes[1];
364 
365       /* make sure in case the parent forks again, the fd_rep
366        * is closed -> otherwise the mechanism to signal end-of-stream
367        * gets broken */
368       afs_fd_set_cloexec(fd_rep);
369 
370       if (wait_for_child() == true)
371 	last_fork_rep = true;
372       else
373 	last_fork_rep = false;
374     }
375   }
376 }
377 
378 /**
379  * Cleans (waits for) the forked child process. Note! This
380  * function should be called from the same thread as
381  * fork_child_for_read/write() was called.
382  *
383  * In case the function is called from a different thread,
384  * it attemts to terminate the child anyways, but the child's
385  *  state is not known exactly when function returns.
386  *
387  * @param force if true, client is terminated with SIGKILL,
388  *              which guarantees that it terminates (but
389  *              possibly without going through normal
390  *              exit procedure); should be avoided especially
391  *              for output objects as this may result in
392  *              data loss
393  */
clean_child(bool force)394 void AUDIO_IO_FORKED_STREAM::clean_child(bool force)
395 {
396   if (fd_rep > 0) {
397     /* close the pipe between this process and the forked child
398      * process, should terminate the forked application -> see
399      * waitpid() below */
400     ECA_LOG_MSG(ECA_LOGGER::system_objects,
401 		"closing pipe handle for: " + object_rep);
402     ::close(fd_rep);
403     fd_rep = -1;
404   }
405 
406   if (pid_of_child_rep > 0 &&
407       force == true) {
408     if (sigkill_sent_rep != true) {
409       ECA_LOG_MSG(ECA_LOGGER::system_objects,
410 		  "Sending SIGKILL to child process related to: "
411 		  + object_rep);
412       kill(pid_of_child_rep, SIGKILL);
413       sigkill_sent_rep = true;
414     }
415     else {
416       /* SIGKILL already sent once for this process, don't send it again */
417       pid_of_child_rep = -1;
418     }
419   }
420 
421   if (pid_of_child_rep > 0 &&
422       pid_of_parent_rep == getpid()) {
423     /* wait until child process has exited
424      * note: this only works reliable when our pid is
425      *       the same as used for starting the child */
426     int flags = 0;
427     int status = 0;
428 
429     ECA_LOG_MSG(ECA_LOGGER::system_objects,
430 		"waitpid() for: " + object_rep);
431     int res = waitpid(pid_of_child_rep, &status, flags);
432 
433     if (res == pid_of_child_rep) {
434       ECA_LOG_MSG(ECA_LOGGER::system_objects, "Child process exit ok: "
435 		  + object_rep);
436       pid_of_child_rep = 0;
437     }
438     else {
439       ECA_LOG_MSG(ECA_LOGGER::system_objects, "Problems in terminating child process:" + std::string(strerror(errno)));
440     }
441   }
442 
443   if (pid_of_child_rep > 0) {
444     /* unable to use wait(), terminating with a signal */
445     ECA_LOG_MSG(ECA_LOGGER::system_objects, "Child not responding, sending SIGTERM: " +
446 		object_rep);
447     kill(pid_of_child_rep, SIGTERM);
448     pid_of_child_rep = 0;
449   }
450 
451   if (tmp_file_created_rep == true) {
452     ::remove(tmpfile_repp.c_str());
453     tmp_file_created_rep = false;
454   }
455 }
456 
457 /**
458  * Checks whether child is still active. Returns false
459  * if child has exited, otherwise true.
460  */
wait_for_child(void) const461 bool AUDIO_IO_FORKED_STREAM::wait_for_child(void) const
462 {
463   if (pid_of_child_rep <= 0)
464     return false;
465   else if (pid_of_parent_rep == getpid() &&
466 	   pid_of_child_rep > 0) {
467     int pid = waitpid(pid_of_child_rep, 0, WNOHANG);
468     if (pid == pid_of_child_rep) {
469       return false;
470     }
471     /* no change in state, so still active */
472     return true;
473   }
474   else
475     /* note: we don't really know so assume that yes */
476     return true;
477 }
478