1 // ------------------------------------------------------------------------
2 // audioio-forked-streams.cpp: Helper class providing routines for
3 // forking for piped input/output.
4 // Copyright (C) 2000-2004,2006,2008,2011 Kai Vehmanen
5 //
6 // Attributes:
7 // eca-style-version: 3 (see Ecasound Programmer's Guide)
8 //
9 // This program is free software; you can redistribute it and/or modify
10 // it under the terms of the GNU General Public License as published by
11 // the Free Software Foundation; either version 2 of the License, or
12 // (at your option) any later version.
13 //
14 // This program is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 // ------------------------------------------------------------------------
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <cstdlib>
29 #include <vector>
30 #include <string>
31 #include <cstring>
32 #include <iostream>
33 #include <cstring>
34 #include <cstdio>
35
36 #include <sys/stat.h>
37 #include <sys/types.h>
38 #include <sys/wait.h>
39 #include <fcntl.h>
40 #include <signal.h>
41 #include <unistd.h>
42 #include <errno.h>
43
44 #include <kvu_dbc.h>
45 #include <kvu_numtostr.h>
46 #include <kvu_utils.h>
47
48 #include "eca-logger.h"
49 #include "audioio-forked-stream.h"
50
51 using namespace std;
52
53 /**
54 * Maximum number of arguments passed to exec()
55 */
56 const static int afs_max_exec_args = 1024;
57
58 /**
59 * Runs exec() with the given parameters.
60 * @return exec() return value
61 */
afs_run_exec(const string & command,const string & filename)62 static int afs_run_exec(const string& command, const string& filename)
63 {
64 vector<string> temp = kvu_string_to_tokens_quoted(command);
65 if (static_cast<int>(temp.size()) > afs_max_exec_args) {
66 temp.resize(afs_max_exec_args);
67 ECA_LOG_MSG(ECA_LOGGER::info, "WARNING: too many arguments for external application, truncating.");
68 }
69 const char* args[afs_max_exec_args];
70 vector<string>::size_type p = 0;
71 while(p < temp.size()) {
72 if (temp[p].find("%f") != string::npos) {
73 temp[p].replace(temp[p].find("%f"), 2, filename);
74 args[p] = temp[p].c_str();
75 }
76 else
77 args[p] = temp[p].c_str();
78 ++p;
79 }
80 args[p] = 0;
81 return execvp(temp[0].c_str(), const_cast<char**>(args));
82 }
83
afs_fd_set_cloexec(int fd)84 static int afs_fd_set_cloexec(int fd)
85 {
86 int flags;
87 flags = fcntl(fd, F_GETFD);
88 if (flags >= 0) {
89 flags |= FD_CLOEXEC;
90 if (fcntl(fd, F_SETFD, flags) >= 0)
91 return 0;
92 }
93 ECA_LOG_MSG(ECA_LOGGER::info, "unable to set FD_CLOEXEC: " +
94 std::string(strerror(errno)));
95 return -1;
96 }
97
~AUDIO_IO_FORKED_STREAM(void)98 AUDIO_IO_FORKED_STREAM::~AUDIO_IO_FORKED_STREAM(void)
99 {
100 if (pid_of_child_rep > 0)
101 clean_child(true);
102 }
103
stop_io(void)104 void AUDIO_IO_FORKED_STREAM::stop_io(void)
105 {
106 ECA_LOG_MSG(ECA_LOGGER::user_objects, "stop_io()");
107 clean_child(false);
108 }
109
110 /**
111 * If found, replaces the string '%f' with 'filename'. This is
112 * the file used by the forked child for input/output.
113 */
set_fork_file_name(const string & filename)114 void AUDIO_IO_FORKED_STREAM::set_fork_file_name(const string& filename)
115 {
116 object_rep = filename;
117 /* do not yet replace %f yet as it would make it more
118 difficult to tokenize the exec string */
119 }
120
121 /**
122 * If found, replaces the string '%F' with a path name to a
123 * temporary named pipe. This pipe will be used for communicating
124 * with the forked child instead of standard input and output pipes.
125 */
set_fork_pipe_name(void)126 void AUDIO_IO_FORKED_STREAM::set_fork_pipe_name(void)
127 {
128 if (command_rep.find("%F") != string::npos) {
129 use_named_pipe_rep = true;
130 init_temp_directory();
131 if (tempfile_dir_rep.is_valid() == true) {
132 tmpfile_repp = tempfile_dir_rep.create_filename("fork-pipe", ".raw");
133 ::mkfifo(tmpfile_repp.c_str(), 0755);
134 command_rep.replace(command_rep.find("%F"), 2, tmpfile_repp);
135 tmp_file_created_rep = true;
136 }
137 else
138 tmp_file_created_rep = false;
139 }
140 else
141 use_named_pipe_rep = false;
142 }
143
init_temp_directory(void)144 void AUDIO_IO_FORKED_STREAM::init_temp_directory(void)
145 {
146 string tmpdir ("ecasound-");
147 char* tmp_p = getenv("USER");
148 if (tmp_p != NULL) {
149 tmpdir += string(tmp_p);
150 tempfile_dir_rep.reserve_directory(tmpdir);
151 }
152 if (tempfile_dir_rep.is_valid() != true) {
153 ECA_LOG_MSG(ECA_LOGGER::info, "WARNING: Unable to create temporary directory \"" + tmpdir + "\".");
154 }
155 }
156
157 /**
158 * If found, replaces the string '%c' with value of parameter
159 * 'channels'.
160 */
set_fork_channels(int channels)161 void AUDIO_IO_FORKED_STREAM::set_fork_channels(int channels)
162 {
163 if (command_rep.find("%c") != string::npos) {
164 command_rep.replace(command_rep.find("%c"), 2, kvu_numtostr(channels));
165 }
166 }
167
168 /**
169 * If found, replaces the string '%s' with value of parameter
170 * 'sample_rate', and '%S' with 'sample_rate/1000' (kHz).
171 */
set_fork_sample_rate(long int sample_rate)172 void AUDIO_IO_FORKED_STREAM::set_fork_sample_rate(long int sample_rate)
173 {
174 if (command_rep.find("%s") != string::npos) {
175 command_rep.replace(command_rep.find("%s"), 2, kvu_numtostr(sample_rate));
176 }
177 if (command_rep.find("%S") != string::npos) {
178 command_rep.replace(command_rep.find("%S"), 2, kvu_numtostr(sample_rate/1000.0f));
179 }
180 }
181
182 /**
183 * If found, replaces the string '%b' with value of parameter
184 * 'bits'.
185 */
set_fork_bits(int bits)186 void AUDIO_IO_FORKED_STREAM::set_fork_bits(int bits)
187 {
188 if (command_rep.find("%b") != string::npos) {
189 command_rep.replace(command_rep.find("%b"), 2, kvu_numtostr(bits));
190 }
191 }
192
fork_child_for_read(void)193 void AUDIO_IO_FORKED_STREAM::fork_child_for_read(void)
194 {
195 ECA_LOG_MSG(ECA_LOGGER::user_objects, "Fork child-for-read: '" + fork_command() + "'");
196
197 init_state_before_fork();
198
199 if (use_named_pipe_rep == true) {
200 if (tmp_file_created_rep == true) {
201 fork_child_for_fifo_read();
202 }
203 else {
204 last_fork_rep = false;
205 }
206 }
207 else {
208 int fpipes[2];
209 if (pipe(fpipes) == 0) {
210 sigkill_sent_rep = false;
211 pid_of_child_rep = fork();
212 if (pid_of_child_rep == 0) {
213 // ---
214 // child
215 // ---
216
217 sigset_t newset;
218 sigemptyset(&newset);
219
220 sigaddset(&newset, SIGTERM);
221 sigaddset(&newset, SIGPIPE);
222
223 #if defined(HAVE_PTHREAD_SIGMASK)
224 pthread_sigmask(SIG_UNBLOCK, &newset, NULL);
225 #elif defined(HAVE_SIGPROCMASK)
226 sigprocmask(SIG_UNBLOCK, &newset, NULL);
227 #endif
228
229 ::close(1);
230 dup2(fpipes[1], 1);
231 ::close(fpipes[0]);
232 ::close(fpipes[1]);
233 freopen("/dev/null", "w", stderr);
234 int res = afs_run_exec(command_rep, object_rep);
235 ::close(1);
236 exit(res);
237 cerr << "You shouldn't see this!\n";
238 }
239 else if (pid_of_child_rep > 0) {
240 // ---
241 // parent
242 // ---
243
244 pid_of_parent_rep = ::getpid();
245 ::close(fpipes[1]);
246 fd_rep = fpipes[0];
247 afs_fd_set_cloexec(fd_rep);
248 if (wait_for_child() == true)
249 last_fork_rep = true;
250 else
251 last_fork_rep = false;
252 }
253 }
254 }
255 }
256
257 /**
258 * Initializes state that needs to be reset/refresh
259 * between every new fork of a child object.
260 */
init_state_before_fork(void)261 void AUDIO_IO_FORKED_STREAM::init_state_before_fork(void)
262 {
263 last_fork_rep = false;
264 fd_rep = 0;
265
266 if (do_supports_seeking() != true)
267 do_set_position_in_samples(0);
268 }
269
fork_child_for_fifo_read(void)270 void AUDIO_IO_FORKED_STREAM::fork_child_for_fifo_read(void)
271 {
272 ECA_LOG_MSG(ECA_LOGGER::user_objects, "Fork child-for-fifo-read: '" + fork_command() + "'");
273
274 init_state_before_fork();
275
276 sigkill_sent_rep = false;
277 pid_of_child_rep = fork();
278 if (pid_of_child_rep == 0) {
279 // ---
280 // child
281 // ---
282
283 sigset_t newset;
284 sigemptyset(&newset);
285 sigaddset(&newset, SIGTERM);
286 sigaddset(&newset, SIGPIPE);
287
288 #if defined(HAVE_PTHREAD_SIGMASK)
289 pthread_sigmask(SIG_UNBLOCK, &newset, NULL);
290 #elif defined(HAVE_SIGPROCMASK)
291 sigprocmask(SIG_UNBLOCK, &newset, NULL);
292 #endif
293
294 freopen("/dev/null", "w", stderr);
295 int res = afs_run_exec(command_rep, object_rep);
296 if (res < 0) {
297 /**
298 * If execvp failed, make sure that the other end of
299 * the pipe doesn't block forever.
300 */
301 cerr << "execvp() failed!\n";
302 int fd = open(tmpfile_repp.c_str(), O_WRONLY);
303 close(fd);
304 }
305
306 exit(res);
307 cerr << "You shouldn't see this!\n";
308 }
309 else if (pid_of_child_rep > 0) {
310 // ---
311 // parent
312 // ---
313
314 pid_of_parent_rep = ::getpid();
315 fd_rep = 0;
316 if (wait_for_child() == true)
317 fd_rep = ::open(tmpfile_repp.c_str(), O_RDONLY);
318 if (fd_rep > 0) {
319 last_fork_rep = true;
320 afs_fd_set_cloexec(fd_rep);
321 }
322 }
323 }
324
fork_child_for_write(void)325 void AUDIO_IO_FORKED_STREAM::fork_child_for_write(void)
326 {
327 ECA_LOG_MSG(ECA_LOGGER::user_objects, "Fork child-for-write: '" + fork_command() + "'");
328
329 init_state_before_fork();
330
331 int fpipes[2];
332 if (pipe(fpipes) == 0) {
333 sigkill_sent_rep = false;
334 pid_of_child_rep = fork();
335 if (pid_of_child_rep == 0) {
336 // ---
337 // child
338 // ---
339 sigset_t newset;
340 sigaddset(&newset, SIGTERM);
341 sigaddset(&newset, SIGPIPE);
342
343 #if defined(HAVE_PTHREAD_SIGMASK)
344 pthread_sigmask(SIG_UNBLOCK, &newset, NULL);
345 #elif defined(HAVE_SIGPROCMASK)
346 sigprocmask(SIG_UNBLOCK, &newset, NULL);
347 #endif
348
349 ::close(0);
350 ::dup2(fpipes[0],0);
351 ::close(fpipes[0]);
352 ::close(fpipes[1]);
353 freopen("/dev/null", "w", stderr);
354 exit(afs_run_exec(command_rep, object_rep));
355 cerr << "You shouln't see this!\n";
356 }
357 else if (pid_of_child_rep > 0) {
358 // ---
359 // parent
360 // ---
361 pid_of_parent_rep = ::getpid();
362 ::close(fpipes[0]);
363 fd_rep = fpipes[1];
364
365 /* make sure in case the parent forks again, the fd_rep
366 * is closed -> otherwise the mechanism to signal end-of-stream
367 * gets broken */
368 afs_fd_set_cloexec(fd_rep);
369
370 if (wait_for_child() == true)
371 last_fork_rep = true;
372 else
373 last_fork_rep = false;
374 }
375 }
376 }
377
378 /**
379 * Cleans (waits for) the forked child process. Note! This
380 * function should be called from the same thread as
381 * fork_child_for_read/write() was called.
382 *
383 * In case the function is called from a different thread,
384 * it attemts to terminate the child anyways, but the child's
385 * state is not known exactly when function returns.
386 *
387 * @param force if true, client is terminated with SIGKILL,
388 * which guarantees that it terminates (but
389 * possibly without going through normal
390 * exit procedure); should be avoided especially
391 * for output objects as this may result in
392 * data loss
393 */
clean_child(bool force)394 void AUDIO_IO_FORKED_STREAM::clean_child(bool force)
395 {
396 if (fd_rep > 0) {
397 /* close the pipe between this process and the forked child
398 * process, should terminate the forked application -> see
399 * waitpid() below */
400 ECA_LOG_MSG(ECA_LOGGER::system_objects,
401 "closing pipe handle for: " + object_rep);
402 ::close(fd_rep);
403 fd_rep = -1;
404 }
405
406 if (pid_of_child_rep > 0 &&
407 force == true) {
408 if (sigkill_sent_rep != true) {
409 ECA_LOG_MSG(ECA_LOGGER::system_objects,
410 "Sending SIGKILL to child process related to: "
411 + object_rep);
412 kill(pid_of_child_rep, SIGKILL);
413 sigkill_sent_rep = true;
414 }
415 else {
416 /* SIGKILL already sent once for this process, don't send it again */
417 pid_of_child_rep = -1;
418 }
419 }
420
421 if (pid_of_child_rep > 0 &&
422 pid_of_parent_rep == getpid()) {
423 /* wait until child process has exited
424 * note: this only works reliable when our pid is
425 * the same as used for starting the child */
426 int flags = 0;
427 int status = 0;
428
429 ECA_LOG_MSG(ECA_LOGGER::system_objects,
430 "waitpid() for: " + object_rep);
431 int res = waitpid(pid_of_child_rep, &status, flags);
432
433 if (res == pid_of_child_rep) {
434 ECA_LOG_MSG(ECA_LOGGER::system_objects, "Child process exit ok: "
435 + object_rep);
436 pid_of_child_rep = 0;
437 }
438 else {
439 ECA_LOG_MSG(ECA_LOGGER::system_objects, "Problems in terminating child process:" + std::string(strerror(errno)));
440 }
441 }
442
443 if (pid_of_child_rep > 0) {
444 /* unable to use wait(), terminating with a signal */
445 ECA_LOG_MSG(ECA_LOGGER::system_objects, "Child not responding, sending SIGTERM: " +
446 object_rep);
447 kill(pid_of_child_rep, SIGTERM);
448 pid_of_child_rep = 0;
449 }
450
451 if (tmp_file_created_rep == true) {
452 ::remove(tmpfile_repp.c_str());
453 tmp_file_created_rep = false;
454 }
455 }
456
457 /**
458 * Checks whether child is still active. Returns false
459 * if child has exited, otherwise true.
460 */
wait_for_child(void) const461 bool AUDIO_IO_FORKED_STREAM::wait_for_child(void) const
462 {
463 if (pid_of_child_rep <= 0)
464 return false;
465 else if (pid_of_parent_rep == getpid() &&
466 pid_of_child_rep > 0) {
467 int pid = waitpid(pid_of_child_rep, 0, WNOHANG);
468 if (pid == pid_of_child_rep) {
469 return false;
470 }
471 /* no change in state, so still active */
472 return true;
473 }
474 else
475 /* note: we don't really know so assume that yes */
476 return true;
477 }
478