1 // Copyright (C) 2016  Davis E. King (davis@dlib.net)
2 // License: Boost Software License   See LICENSE.txt for the full license.
3 #ifndef DLIB_SUBPROCeSS_STREAM_H_
4 #define DLIB_SUBPROCeSS_STREAM_H_
5 
6 #include <utility>
7 #include <unistd.h>
8 #include <iostream>
9 #include <memory>
10 #include <dlib/matrix.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 
14 
15 namespace dlib
16 {
17 
18 // --------------------------------------------------------------------------------------
19 
20     // Call dlib's serialize and deserialize by default.   The point of this version of
21     // serialize is to do something fast that normally we wouldn't do, like directly copy
22     // memory.  This is safe since this is an interprocess communication happening the same
23     // machine.
interprocess_serialize(const T & item,std::ostream & out)24     template <typename T> void interprocess_serialize ( const T& item, std::ostream& out) { serialize(item, out); }
interprocess_deserialize(T & item,std::istream & in)25     template <typename T> void interprocess_deserialize (T& item, std::istream& in) { deserialize(item, in); }
26 
27     // But have overloads for direct memory copies for some types since this is faster than
28     // their default serialization.
29     template <typename T, long NR, long NC, typename MM, typename L>
interprocess_serialize(const dlib::matrix<T,NR,NC,MM,L> & item,std::ostream & out)30     void interprocess_serialize(const dlib::matrix<T,NR,NC,MM,L>& item, std::ostream& out)
31     {
32         dlib::serialize(item.nr(), out);
33         dlib::serialize(item.nc(), out);
34         if (item.size() != 0)
35             out.write((const char*)&item(0,0), sizeof(T)*item.size());
36         if (!out)
37             throw dlib::serialization_error("Error writing matrix to interprocess iostream.");
38     }
39 
40     template <typename T, long NR, long NC, typename MM, typename L>
interprocess_deserialize(dlib::matrix<T,NR,NC,MM,L> & item,std::istream & in)41     void interprocess_deserialize(dlib::matrix<T,NR,NC,MM,L>& item, std::istream& in)
42     {
43         long nr, nc;
44         dlib::deserialize(nr, in);
45         dlib::deserialize(nc, in);
46         item.set_size(nr,nc);
47         if (item.size() != 0)
48             in.read((char*)&item(0,0), sizeof(T)*item.size());
49         if (!in)
50             throw dlib::serialization_error("Error reading matrix from interprocess iostream.");
51     }
52 
53 // ----------------------------------------------------------------------------------------
54 
55     namespace impl{ std::iostream& get_data_iostream(); }
56 
send_to_parent_process()57     inline void send_to_parent_process() {impl::get_data_iostream().flush();}
58     template <typename U, typename ...T>
send_to_parent_process(U && arg1,T &&...args)59     void send_to_parent_process(U&& arg1, T&& ...args)
60     /*!
61         ensures
62             - sends all the arguments to send_to_parent_process() to the parent process by
63               serializing them with interprocess_serialize().
64     !*/
65     {
66         interprocess_serialize(arg1, impl::get_data_iostream());
67         send_to_parent_process(std::forward<T>(args)...);
68         if (!impl::get_data_iostream())
69             throw dlib::error("Error sending object to parent process.");
70     }
71 
receive_from_parent_process()72     inline void receive_from_parent_process() {}
73     template <typename U, typename ...T>
receive_from_parent_process(U && arg1,T &&...args)74     void receive_from_parent_process(U&& arg1, T&& ...args)
75     /*!
76         ensures
77             - receives all the arguments to receive_from_parent_process() from the parent
78               process by deserializing them from interprocess_serialize().
79     !*/
80     {
81         interprocess_deserialize(arg1, impl::get_data_iostream());
82         receive_from_parent_process(std::forward<T>(args)...);
83         if (!impl::get_data_iostream())
84             throw dlib::error("Error receiving object from parent process.");
85     }
86 
87 
88 // ----------------------------------------------------------------------------------------
89 
90     class filestreambuf;
91 
92     class subprocess_stream : noncopyable
93     {
94         /*!
95             WHAT THIS OBJECT REPRESENTS
96                 This is a tool for spawning a subprocess and communicating with it.  Here
97                 is an example:
98 
99                     subprocess_stream s("/usr/bin/some_program");
100                     s.send(obj1, obj2, obj3);
101                     s.receive(obj4, obj5);
102                     s.wait(); // wait for sub process to terminate
103 
104                 Then in the sub process you would have:
105 
106                     receive_from_parent_process(obj1, obj2, obj3);
107                     // do stuff
108                     cout << "echo this text to parent cout" << endl;
109                     send_to_parent_process(obj4, obj5);
110 
111 
112                 Additionally, if the sub process writes to its standard out then that will
113                 be echoed to std::cout in the parent process.  Writing to std::cerr or
114                 returning a non-zero value from main will also be noted by the parent
115                 process and an appropriate exception will be thrown.
116         !*/
117 
118     public:
119 
120         explicit subprocess_stream(
121             const char* program_name
122         );
123         /*!
124             ensures
125                 - spawns a sub process by executing the file with the given program_name.
126         !*/
127 
128         ~subprocess_stream(
129         );
130         /*!
131             ensures
132                 - calls wait().  Note that the destructor never throws even though wait() can.
133                   If an exception is thrown by wait() it is just logged to std::cerr.
134         !*/
135 
136         void wait(
137         );
138         /*!
139             ensures
140                 - closes the input stream to the child process and then waits for the child
141                   to terminate.
142                 - If the child returns an error (by returning != 0 from its main) or
143                   outputs to its standard error then wait() throws a dlib::error() with the
144                   standard error output in it.
145         !*/
146 
get_child_pid()147         int get_child_pid() const { return child_pid; }
148         /*!
149             ensures
150                 - returns the PID of the child process
151         !*/
152 
153         template <typename U, typename ...T>
send(U && arg1,T &&...args)154         void send(U&& arg1, T&& ...args)
155         /*!
156             ensures
157                 - sends all the arguments to send() to the subprocess by serializing them
158                   with interprocess_serialize().
159         !*/
160         {
161             interprocess_serialize(arg1, iosub);
162             send(std::forward<T>(args)...);
163             if (!iosub)
164             {
165                 std::ostringstream sout;
166                 sout << stderr.rdbuf();
167                 throw dlib::error("Error sending object to child process.\n" + sout.str());
168             }
169         }
send()170         void send() {iosub.flush();}
171 
172         template <typename U, typename ...T>
receive(U && arg1,T &&...args)173         void receive(U&& arg1, T&& ...args)
174         /*!
175             ensures
176                 - receives all the arguments to receive() to the subprocess by deserializing
177                   them with interprocess_deserialize().
178         !*/
179         {
180             interprocess_deserialize(arg1, iosub);
181             receive(std::forward<T>(args)...);
182             if (!iosub)
183             {
184                 std::ostringstream sout;
185                 sout << stderr.rdbuf();
186                 throw dlib::error("Error receiving object from child process.\n" + sout.str() );
187             }
188         }
receive()189         void receive() {}
190 
191 
192     private:
193 
194         void send_eof();
195 
196         class cpipe : noncopyable
197         {
198         private:
199             int fd[2];
200         public:
cpipe()201             cpipe() { if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fd)) throw dlib::error("Failed to create pipe"); }
~cpipe()202             ~cpipe() { close(); }
parent_fd()203             int parent_fd() const { return fd[0]; }
child_fd()204             int child_fd() const { return fd[1]; }
close()205             void close() { ::close(fd[0]); ::close(fd[1]); }
206         };
207 
208         cpipe data_pipe;
209         cpipe stdout_pipe;
210         cpipe stderr_pipe;
211         bool wait_called = false;
212         std::unique_ptr<filestreambuf> inout_buf;
213         std::unique_ptr<filestreambuf> err_buf;
214         int child_pid = -1;
215         std::istream stderr;
216         std::iostream iosub;
217     };
218 }
219 
220 // ----------------------------------------------------------------------------------------
221 
222 #endif // DLIB_SUBPROCeSS_STREAM_H_
223 
224