1 /////////////////////////////////////////////////////////////////////////////
2 // Name: PipeExecute.cpp
3 // Purpose: Pipe execute
4 // Author: Alex Thuering
5 // Created: 23.08.2004
6 // RCS-ID: $Id: PipeExecute.cpp,v 1.12 2016/08/04 16:32:15 ntalex Exp $
7 // Copyright: (c) Alex Thuering
8 // Licence: wxWindows licence
9 /////////////////////////////////////////////////////////////////////////////
10
11 #include "PipeExecute.h"
12 #include "utils.h"
13 #include <wx/process.h>
14 #include <wx/wfstream.h>
15 #include <wx/txtstrm.h>
16 #include <wx/log.h>
17
18 #define BUF_SIZE 4096
19 #define MAX_PROCESSES 5
20
21 #ifndef __WXMSW__
22
23 #include <unistd.h>
24 #include <fcntl.h>
25 class wxOutputStreamUnblocker: public wxFileOutputStream
26 {
27 public:
SetBlock(bool block=true)28 void SetBlock(bool block = true)
29 {
30 int flags = fcntl(m_file->fd(), F_GETFL) | O_NONBLOCK;
31 if (block)
32 flags -= O_NONBLOCK;
33 fcntl(m_file->fd(), F_SETFL, flags);
34 }
35 };
36
37 #else
38
39 #include <windows.h>
40 class wxPipeOutputStream: public wxOutputStream
41 {
42 public:
43 wxPipeOutputStream(HANDLE hOutput);
44 virtual ~wxPipeOutputStream();
45
46 protected:
47 size_t OnSysWrite(const void *buffer, size_t len);
48
49 protected:
50 HANDLE m_hOutput;
51
52 DECLARE_NO_COPY_CLASS(wxPipeOutputStream)
53 };
54 class wxOutputStreamUnblocker: public wxPipeOutputStream {
55 public:
wxOutputStreamUnblocker(HANDLE hOutput)56 wxOutputStreamUnblocker(HANDLE hOutput) : wxPipeOutputStream(hOutput) {}
57
SetBlock(bool block=true)58 void SetBlock(bool block = true) {
59 DWORD mode = PIPE_READMODE_BYTE | PIPE_NOWAIT;
60 if (block)
61 mode -= PIPE_NOWAIT;
62 if (!SetNamedPipeHandleState(m_hOutput, &mode, NULL, NULL)) {
63 wxLogLastError(_T("SetNamedPipeHandleState(PIPE_NOWAIT)"));
64 }
65 }
66 };
67
68 #endif
69
wxUnblockOutputStream(wxFileOutputStream & stream)70 void wxUnblockOutputStream(wxFileOutputStream& stream) {
71 ((wxOutputStreamUnblocker&)stream).SetBlock(false);
72 }
73
74 class PipeProcess: public wxProcess {
75 public:
76 /**
77 * Constructor
78 */
PipeProcess(wxPipeExecute * parent,PipeProcess * nextProc=NULL,wxFileInputStream * input=NULL,wxFileOutputStream * output=NULL)79 PipeProcess(wxPipeExecute* parent, PipeProcess* nextProc = NULL, wxFileInputStream* input = NULL,
80 wxFileOutputStream* output = NULL) :
81 m_parent(parent), m_nextProc(nextProc), m_input(input), m_output(output) {
82 m_isRunning = true;
83 m_exitCode = -1;
84 m_lastRead = m_lastWrite = 0;
85 Redirect();
86 }
87
88 /**
89 * Destructor
90 */
~PipeProcess()91 ~PipeProcess() {
92 if (m_inputLine.length()) {
93 m_parent->ProcessOutput(m_inputLine);
94 m_inputLine = wxT("");
95 }
96 if (m_errorLine.length()) {
97 m_parent->ProcessOutput(m_errorLine);
98 m_errorLine = wxT("");
99 }
100 }
101
102 /**
103 * Reads data from given stream and prints out (see ProcessOutput)
104 */
DoGetFromStream(wxInputStream & in,wxString & line)105 void DoGetFromStream(wxInputStream& in, wxString& line) {
106 wxTextInputStream tis(in);
107 while (in.CanRead()) {
108 wxYieldIfNeeded();
109 while (in.CanRead()) {
110 wxChar c = tis.GetChar();
111 if (c == wxT('\n') || c == wxT('\r')) {
112 if (line.length())
113 m_parent->ProcessOutput(line);
114 line = wxT("");
115 break;
116 }
117 line += c;
118 }
119 }
120 }
121
122 /**
123 * Reads from given stream and writes into process input stream
124 */
DoPutToStream(wxInputStream & in)125 bool DoPutToStream(wxInputStream& in) {
126 wxUnblockOutputStream((wxFileOutputStream&) *GetOutputStream());
127 if (m_lastWrite == m_lastRead) {
128 m_lastRead = in.Read(m_buffer, sizeof(m_buffer)).LastRead();
129 wxLogNull log;
130 m_lastWrite = GetOutputStream()->Write(m_buffer, m_lastRead).LastWrite();
131 } else {
132 wxLogNull log;
133 m_lastWrite += GetOutputStream()->Write(m_buffer + m_lastWrite, m_lastRead - m_lastWrite).LastWrite();
134 }
135 return m_lastWrite == m_lastRead;
136 }
137
138 /**
139 * Is some data lefts to write after last call of DoPutToStream()?
140 */
HasDataToPut()141 bool HasDataToPut() {
142 return m_lastWrite != m_lastRead;
143 }
144
145 /**
146 * Process of input/output data
147 */
HasInput()148 bool HasInput() {
149 bool hasInput = false;
150
151 // Read from input stream and write to process input
152 if (IsRunning() && m_input && (m_input->CanRead() || HasDataToPut())) {
153 if (DoPutToStream(*m_input) && m_input->Eof())
154 CloseOutput();
155 else
156 hasInput = true;
157 }
158
159 // read from process input
160 if (IsInputAvailable() || (m_nextProc && m_nextProc->HasDataToPut())) {
161 if (m_nextProc) // and write to next process
162 {
163 if (m_nextProc->IsRunning())
164 m_nextProc->DoPutToStream(*GetInputStream());
165 else
166 return false;
167 } else if (m_output) // and write in output fstream
168 {
169 while (GetInputStream()->CanRead()) {
170 wxYield();
171 char buffer[BUF_SIZE];
172 int cnt = GetInputStream()->Read(buffer, sizeof(buffer)).LastRead();
173 m_output->Write(buffer, cnt);
174 }
175 } else
176 // and print out
177 DoGetFromStream(*GetInputStream(), m_inputLine);
178 hasInput = true;
179 }
180
181 // read from process error stream and print out
182 if (IsErrorAvailable()) {
183 DoGetFromStream(*GetErrorStream(), m_errorLine);
184 hasInput = true;
185 }
186
187 // close input from next process if this process finished
188 if (m_nextProc && !hasInput && !IsRunning()) {
189 m_nextProc->CloseOutput();
190 m_nextProc = NULL;
191 }
192
193 return hasInput;
194 }
195
OnTerminate(int pid,int status)196 void OnTerminate(int pid, int status) {
197 m_exitCode = status;
198 m_isRunning = false;
199 }
200
IsRunning() const201 bool IsRunning() const {
202 return m_isRunning;
203 }
204
GetExitCode() const205 int GetExitCode() const {
206 return m_exitCode;
207 }
208
209 protected:
210 wxPipeExecute* m_parent;
211 bool m_isRunning;
212 int m_exitCode;
213 PipeProcess* m_nextProc;
214 wxFileInputStream* m_input;
215 wxFileOutputStream* m_output;
216 int m_lastRead, m_lastWrite;
217 char m_buffer[BUF_SIZE];
218 wxString m_inputLine;
219 wxString m_errorLine;
220 };
221
222 //////////////////////////////////////////////////////////////////////////////
223 //////////////////////////// Execute /////////////////////////////////////////
224 //////////////////////////////////////////////////////////////////////////////
225
Execute(wxString command,wxString inputFile,wxString outputFile)226 bool wxPipeExecute::Execute(wxString command, wxString inputFile, wxString outputFile) {
227 wxArrayString cmds;
228 if (command.Find(wxT("\"concat:")) == -1 && command.Find(wxT("filter:")) == -1) {
229 while (cmds.Count() < MAX_PROCESSES) {
230 int pos = command.find(wxT('|'));
231 if (pos < 0) {
232 cmds.Add(command.Strip(wxString::both));
233 break;
234 }
235 cmds.Add(command.Mid(0, pos).Strip(wxString::both));
236 command.Remove(0, pos + 1);
237 }
238 } else
239 cmds.Add(command);
240 int cnt = cmds.Count();
241
242 wxFileInputStream* input = NULL;
243 ;
244 wxFileOutputStream* output = NULL;
245 if (inputFile.length())
246 input = new wxFileInputStream(inputFile);
247 if (input && !input->IsOk())
248 return false;
249 if (outputFile.length())
250 output = new wxFileOutputStream(outputFile);
251 if (output && !output->IsOk())
252 return false;
253
254 PipeProcess* proc[MAX_PROCESSES];
255 long pid[MAX_PROCESSES];
256 PipeProcess* prevProc = NULL;
257 for (int i = cnt - 1; i >= 0; i--) {
258 proc[i] = new PipeProcess(this, prevProc, i == 0 ? input : NULL, i == cnt - 1 ? output : NULL);
259 prevProc = proc[i];
260 }
261 for (int i = 0; i < cnt; i++) {
262 wxString cmd = cmds[i];
263 #ifdef __WXMSW__
264 if (cmd.BeforeFirst(wxT(' ')).Find(wxT(':')) == -1)
265 cmd = wxGetAppPath() + cmd;
266 #elif __WXMAC__
267 if (cmd.length() > 0 && cmd[0] != wxT('/'))
268 cmd = wxGetAppPath() + cmd;
269 #endif
270 pid[i] = wxExecute(cmd, wxEXEC_ASYNC, proc[i]);
271 if (pid[i] <= 0)
272 return false;
273 }
274 bool hasInput = true;
275 while (proc[cnt - 1]->IsRunning() || hasInput) {
276 wxYield();
277 if (IsCanceled()) {
278 for (int i = 0; i < cnt; i++)
279 wxProcess::Kill(pid[i], wxSIGTERM, wxKILL_CHILDREN);
280 wxMilliSleep(200);
281 for (int i = 0; i < cnt; i++)
282 wxProcess::Kill(pid[i], wxSIGKILL, wxKILL_CHILDREN);
283 break;
284 }
285 hasInput = false;
286 for (int i = 0; i < cnt; i++)
287 hasInput = hasInput || proc[i]->HasInput();
288 if (!hasInput)
289 wxMilliSleep(50);
290 }
291 bool res = proc[cnt - 1]->GetExitCode() == 0;
292 for (int i = 0; i < cnt; i++)
293 if (IsCanceled())
294 proc[i]->Detach();
295 else
296 delete proc[i];
297 if (input)
298 delete input;
299 if (output)
300 delete output;
301
302 return res;
303 }
304