1 // Copyright (C) 2016 Davis E. King (davis@dlib.net) 2 // License: Boost Software License See LICENSE.txt for the full license. 3 #ifndef DLIB_SUBPROCeSS_STREAM_H_ 4 #define DLIB_SUBPROCeSS_STREAM_H_ 5 6 #include <utility> 7 #include <unistd.h> 8 #include <iostream> 9 #include <memory> 10 #include <dlib/matrix.h> 11 #include <sys/types.h> 12 #include <sys/socket.h> 13 14 15 namespace dlib 16 { 17 18 // -------------------------------------------------------------------------------------- 19 20 // Call dlib's serialize and deserialize by default. The point of this version of 21 // serialize is to do something fast that normally we wouldn't do, like directly copy 22 // memory. This is safe since this is an interprocess communication happening the same 23 // machine. interprocess_serialize(const T & item,std::ostream & out)24 template <typename T> void interprocess_serialize ( const T& item, std::ostream& out) { serialize(item, out); } interprocess_deserialize(T & item,std::istream & in)25 template <typename T> void interprocess_deserialize (T& item, std::istream& in) { deserialize(item, in); } 26 27 // But have overloads for direct memory copies for some types since this is faster than 28 // their default serialization. 29 template <typename T, long NR, long NC, typename MM, typename L> interprocess_serialize(const dlib::matrix<T,NR,NC,MM,L> & item,std::ostream & out)30 void interprocess_serialize(const dlib::matrix<T,NR,NC,MM,L>& item, std::ostream& out) 31 { 32 dlib::serialize(item.nr(), out); 33 dlib::serialize(item.nc(), out); 34 if (item.size() != 0) 35 out.write((const char*)&item(0,0), sizeof(T)*item.size()); 36 if (!out) 37 throw dlib::serialization_error("Error writing matrix to interprocess iostream."); 38 } 39 40 template <typename T, long NR, long NC, typename MM, typename L> interprocess_deserialize(dlib::matrix<T,NR,NC,MM,L> & item,std::istream & in)41 void interprocess_deserialize(dlib::matrix<T,NR,NC,MM,L>& item, std::istream& in) 42 { 43 long nr, nc; 44 dlib::deserialize(nr, in); 45 dlib::deserialize(nc, in); 46 item.set_size(nr,nc); 47 if (item.size() != 0) 48 in.read((char*)&item(0,0), sizeof(T)*item.size()); 49 if (!in) 50 throw dlib::serialization_error("Error reading matrix from interprocess iostream."); 51 } 52 53 // ---------------------------------------------------------------------------------------- 54 55 namespace impl{ std::iostream& get_data_iostream(); } 56 send_to_parent_process()57 inline void send_to_parent_process() {impl::get_data_iostream().flush();} 58 template <typename U, typename ...T> send_to_parent_process(U && arg1,T &&...args)59 void send_to_parent_process(U&& arg1, T&& ...args) 60 /*! 61 ensures 62 - sends all the arguments to send_to_parent_process() to the parent process by 63 serializing them with interprocess_serialize(). 64 !*/ 65 { 66 interprocess_serialize(arg1, impl::get_data_iostream()); 67 send_to_parent_process(std::forward<T>(args)...); 68 if (!impl::get_data_iostream()) 69 throw dlib::error("Error sending object to parent process."); 70 } 71 receive_from_parent_process()72 inline void receive_from_parent_process() {} 73 template <typename U, typename ...T> receive_from_parent_process(U && arg1,T &&...args)74 void receive_from_parent_process(U&& arg1, T&& ...args) 75 /*! 76 ensures 77 - receives all the arguments to receive_from_parent_process() from the parent 78 process by deserializing them from interprocess_serialize(). 79 !*/ 80 { 81 interprocess_deserialize(arg1, impl::get_data_iostream()); 82 receive_from_parent_process(std::forward<T>(args)...); 83 if (!impl::get_data_iostream()) 84 throw dlib::error("Error receiving object from parent process."); 85 } 86 87 88 // ---------------------------------------------------------------------------------------- 89 90 class filestreambuf; 91 92 class subprocess_stream : noncopyable 93 { 94 /*! 95 WHAT THIS OBJECT REPRESENTS 96 This is a tool for spawning a subprocess and communicating with it. Here 97 is an example: 98 99 subprocess_stream s("/usr/bin/some_program"); 100 s.send(obj1, obj2, obj3); 101 s.receive(obj4, obj5); 102 s.wait(); // wait for sub process to terminate 103 104 Then in the sub process you would have: 105 106 receive_from_parent_process(obj1, obj2, obj3); 107 // do stuff 108 cout << "echo this text to parent cout" << endl; 109 send_to_parent_process(obj4, obj5); 110 111 112 Additionally, if the sub process writes to its standard out then that will 113 be echoed to std::cout in the parent process. Writing to std::cerr or 114 returning a non-zero value from main will also be noted by the parent 115 process and an appropriate exception will be thrown. 116 !*/ 117 118 public: 119 120 explicit subprocess_stream( 121 const char* program_name 122 ); 123 /*! 124 ensures 125 - spawns a sub process by executing the file with the given program_name. 126 !*/ 127 128 ~subprocess_stream( 129 ); 130 /*! 131 ensures 132 - calls wait(). Note that the destructor never throws even though wait() can. 133 If an exception is thrown by wait() it is just logged to std::cerr. 134 !*/ 135 136 void wait( 137 ); 138 /*! 139 ensures 140 - closes the input stream to the child process and then waits for the child 141 to terminate. 142 - If the child returns an error (by returning != 0 from its main) or 143 outputs to its standard error then wait() throws a dlib::error() with the 144 standard error output in it. 145 !*/ 146 get_child_pid()147 int get_child_pid() const { return child_pid; } 148 /*! 149 ensures 150 - returns the PID of the child process 151 !*/ 152 153 template <typename U, typename ...T> send(U && arg1,T &&...args)154 void send(U&& arg1, T&& ...args) 155 /*! 156 ensures 157 - sends all the arguments to send() to the subprocess by serializing them 158 with interprocess_serialize(). 159 !*/ 160 { 161 interprocess_serialize(arg1, iosub); 162 send(std::forward<T>(args)...); 163 if (!iosub) 164 { 165 std::ostringstream sout; 166 sout << stderr.rdbuf(); 167 throw dlib::error("Error sending object to child process.\n" + sout.str()); 168 } 169 } send()170 void send() {iosub.flush();} 171 172 template <typename U, typename ...T> receive(U && arg1,T &&...args)173 void receive(U&& arg1, T&& ...args) 174 /*! 175 ensures 176 - receives all the arguments to receive() to the subprocess by deserializing 177 them with interprocess_deserialize(). 178 !*/ 179 { 180 interprocess_deserialize(arg1, iosub); 181 receive(std::forward<T>(args)...); 182 if (!iosub) 183 { 184 std::ostringstream sout; 185 sout << stderr.rdbuf(); 186 throw dlib::error("Error receiving object from child process.\n" + sout.str() ); 187 } 188 } receive()189 void receive() {} 190 191 192 private: 193 194 void send_eof(); 195 196 class cpipe : noncopyable 197 { 198 private: 199 int fd[2]; 200 public: cpipe()201 cpipe() { if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fd)) throw dlib::error("Failed to create pipe"); } ~cpipe()202 ~cpipe() { close(); } parent_fd()203 int parent_fd() const { return fd[0]; } child_fd()204 int child_fd() const { return fd[1]; } close()205 void close() { ::close(fd[0]); ::close(fd[1]); } 206 }; 207 208 cpipe data_pipe; 209 cpipe stdout_pipe; 210 cpipe stderr_pipe; 211 bool wait_called = false; 212 std::unique_ptr<filestreambuf> inout_buf; 213 std::unique_ptr<filestreambuf> err_buf; 214 int child_pid = -1; 215 std::istream stderr; 216 std::iostream iosub; 217 }; 218 } 219 220 // ---------------------------------------------------------------------------------------- 221 222 #endif // DLIB_SUBPROCeSS_STREAM_H_ 223 224