1 /////////////////////////////////////////////////////////////////////////////
2 // Name:        PipeExecute.cpp
3 // Purpose:     Pipe execute
4 // Author:      Alex Thuering
5 // Created:     23.08.2004
6 // RCS-ID:      $Id: PipeExecute.cpp,v 1.12 2016/08/04 16:32:15 ntalex Exp $
7 // Copyright:   (c) Alex Thuering
8 // Licence:     wxWindows licence
9 /////////////////////////////////////////////////////////////////////////////
10 
11 #include "PipeExecute.h"
12 #include "utils.h"
13 #include <wx/process.h>
14 #include <wx/wfstream.h>
15 #include <wx/txtstrm.h>
16 #include <wx/log.h>
17 
18 #define BUF_SIZE 4096
19 #define MAX_PROCESSES 5
20 
21 #ifndef __WXMSW__
22 
23 #include <unistd.h>
24 #include <fcntl.h>
25 class wxOutputStreamUnblocker: public wxFileOutputStream
26 {
27   public:
SetBlock(bool block=true)28 	void SetBlock(bool block = true)
29 	{
30 	  int flags = fcntl(m_file->fd(), F_GETFL) | O_NONBLOCK;
31 	  if (block)
32 		flags -= O_NONBLOCK;
33 	  fcntl(m_file->fd(), F_SETFL, flags);
34 	}
35 };
36 
37 #else
38 
39 #include <windows.h>
40 class wxPipeOutputStream: public wxOutputStream
41 {
42   public:
43     wxPipeOutputStream(HANDLE hOutput);
44     virtual ~wxPipeOutputStream();
45 
46   protected:
47     size_t OnSysWrite(const void *buffer, size_t len);
48 
49   protected:
50     HANDLE m_hOutput;
51 
52     DECLARE_NO_COPY_CLASS(wxPipeOutputStream)
53 };
54 class wxOutputStreamUnblocker: public wxPipeOutputStream {
55 public:
wxOutputStreamUnblocker(HANDLE hOutput)56     wxOutputStreamUnblocker(HANDLE hOutput) : wxPipeOutputStream(hOutput) {}
57 
SetBlock(bool block=true)58 	void SetBlock(bool block = true) {
59 		DWORD mode = PIPE_READMODE_BYTE | PIPE_NOWAIT;
60 		if (block)
61 			mode -= PIPE_NOWAIT;
62 		if (!SetNamedPipeHandleState(m_hOutput, &mode, NULL, NULL)) {
63 			wxLogLastError(_T("SetNamedPipeHandleState(PIPE_NOWAIT)"));
64 		}
65 	}
66 };
67 
68 #endif
69 
wxUnblockOutputStream(wxFileOutputStream & stream)70 void wxUnblockOutputStream(wxFileOutputStream& stream) {
71   ((wxOutputStreamUnblocker&)stream).SetBlock(false);
72 }
73 
74 class PipeProcess: public wxProcess {
75 public:
76 	/**
77 	 * Constructor
78 	 */
PipeProcess(wxPipeExecute * parent,PipeProcess * nextProc=NULL,wxFileInputStream * input=NULL,wxFileOutputStream * output=NULL)79 	PipeProcess(wxPipeExecute* parent, PipeProcess* nextProc = NULL, wxFileInputStream* input = NULL,
80 			wxFileOutputStream* output = NULL) :
81 			m_parent(parent), m_nextProc(nextProc), m_input(input), m_output(output) {
82 		m_isRunning = true;
83 		m_exitCode = -1;
84 		m_lastRead = m_lastWrite = 0;
85 		Redirect();
86 	}
87 
88 	/**
89 	 * Destructor
90 	 */
~PipeProcess()91 	~PipeProcess() {
92 		if (m_inputLine.length()) {
93 			m_parent->ProcessOutput(m_inputLine);
94 			m_inputLine = wxT("");
95 		}
96 		if (m_errorLine.length()) {
97 			m_parent->ProcessOutput(m_errorLine);
98 			m_errorLine = wxT("");
99 		}
100 	}
101 
102 	/**
103 	 * Reads data from given stream and prints out (see ProcessOutput)
104 	 */
DoGetFromStream(wxInputStream & in,wxString & line)105     void DoGetFromStream(wxInputStream& in, wxString& line) {
106 		wxTextInputStream tis(in);
107 		while (in.CanRead()) {
108 			wxYieldIfNeeded();
109 			while (in.CanRead()) {
110 				wxChar c = tis.GetChar();
111 				if (c == wxT('\n') || c == wxT('\r')) {
112 					if (line.length())
113 						m_parent->ProcessOutput(line);
114 					line = wxT("");
115 					break;
116 				}
117 				line += c;
118 			}
119 		}
120 	}
121 
122 	/**
123 	 * Reads from given stream and writes into process input stream
124 	 */
DoPutToStream(wxInputStream & in)125 	bool DoPutToStream(wxInputStream& in) {
126 		wxUnblockOutputStream((wxFileOutputStream&) *GetOutputStream());
127 		if (m_lastWrite == m_lastRead) {
128 			m_lastRead = in.Read(m_buffer, sizeof(m_buffer)).LastRead();
129 			wxLogNull log;
130 			m_lastWrite = GetOutputStream()->Write(m_buffer, m_lastRead).LastWrite();
131 		} else {
132 			wxLogNull log;
133 			m_lastWrite += GetOutputStream()->Write(m_buffer + m_lastWrite, m_lastRead - m_lastWrite).LastWrite();
134 		}
135 		return m_lastWrite == m_lastRead;
136 	}
137 
138 	/**
139 	 * Is some data lefts to write after last call of DoPutToStream()?
140 	 */
HasDataToPut()141 	bool HasDataToPut() {
142 		return m_lastWrite != m_lastRead;
143 	}
144 
145 	/**
146 	 * Process of input/output data
147 	 */
HasInput()148     bool HasInput() {
149 		bool hasInput = false;
150 
151 		// Read from input stream and write to process input
152 		if (IsRunning() && m_input && (m_input->CanRead() || HasDataToPut())) {
153 			if (DoPutToStream(*m_input) && m_input->Eof())
154 				CloseOutput();
155 			else
156 				hasInput = true;
157 		}
158 
159 		// read from process input
160 		if (IsInputAvailable() || (m_nextProc && m_nextProc->HasDataToPut())) {
161 			if (m_nextProc) // and write to next process
162 			{
163 				if (m_nextProc->IsRunning())
164 					m_nextProc->DoPutToStream(*GetInputStream());
165 				else
166 					return false;
167 			} else if (m_output) // and write in output fstream
168 			{
169 				while (GetInputStream()->CanRead()) {
170 					wxYield();
171 					char buffer[BUF_SIZE];
172 					int cnt = GetInputStream()->Read(buffer, sizeof(buffer)).LastRead();
173 					m_output->Write(buffer, cnt);
174 				}
175 			} else
176 				// and print out
177 				DoGetFromStream(*GetInputStream(), m_inputLine);
178 			hasInput = true;
179 		}
180 
181 		// read from process error stream and print out
182 		if (IsErrorAvailable()) {
183 			DoGetFromStream(*GetErrorStream(), m_errorLine);
184 			hasInput = true;
185 		}
186 
187 		// close input from next process if this process finished
188 		if (m_nextProc && !hasInput && !IsRunning()) {
189 			m_nextProc->CloseOutput();
190 			m_nextProc = NULL;
191 		}
192 
193 		return hasInput;
194 	}
195 
OnTerminate(int pid,int status)196 	void OnTerminate(int pid, int status) {
197 		m_exitCode = status;
198 		m_isRunning = false;
199 	}
200 
IsRunning() const201 	bool IsRunning() const {
202 		return m_isRunning;
203 	}
204 
GetExitCode() const205 	int GetExitCode() const {
206 		return m_exitCode;
207 	}
208 
209 protected:
210 	wxPipeExecute* m_parent;
211 	bool m_isRunning;
212 	int m_exitCode;
213 	PipeProcess* m_nextProc;
214 	wxFileInputStream* m_input;
215 	wxFileOutputStream* m_output;
216 	int m_lastRead, m_lastWrite;
217 	char m_buffer[BUF_SIZE];
218 	wxString m_inputLine;
219 	wxString m_errorLine;
220 };
221 
222 //////////////////////////////////////////////////////////////////////////////
223 //////////////////////////// Execute /////////////////////////////////////////
224 //////////////////////////////////////////////////////////////////////////////
225 
Execute(wxString command,wxString inputFile,wxString outputFile)226 bool wxPipeExecute::Execute(wxString command, wxString inputFile, wxString outputFile) {
227 	wxArrayString cmds;
228 	if (command.Find(wxT("\"concat:")) == -1 && command.Find(wxT("filter:")) == -1) {
229 		while (cmds.Count() < MAX_PROCESSES) {
230 			int pos = command.find(wxT('|'));
231 			if (pos < 0) {
232 				cmds.Add(command.Strip(wxString::both));
233 				break;
234 			}
235 			cmds.Add(command.Mid(0, pos).Strip(wxString::both));
236 			command.Remove(0, pos + 1);
237 		}
238 	} else
239 		cmds.Add(command);
240 	int cnt = cmds.Count();
241 
242 	wxFileInputStream* input = NULL;
243 	;
244 	wxFileOutputStream* output = NULL;
245 	if (inputFile.length())
246 		input = new wxFileInputStream(inputFile);
247 	if (input && !input->IsOk())
248 		return false;
249 	if (outputFile.length())
250 		output = new wxFileOutputStream(outputFile);
251 	if (output && !output->IsOk())
252 		return false;
253 
254 	PipeProcess* proc[MAX_PROCESSES];
255 	long pid[MAX_PROCESSES];
256 	PipeProcess* prevProc = NULL;
257 	for (int i = cnt - 1; i >= 0; i--) {
258 		proc[i] = new PipeProcess(this, prevProc, i == 0 ? input : NULL, i == cnt - 1 ? output : NULL);
259 		prevProc = proc[i];
260 	}
261 	for (int i = 0; i < cnt; i++) {
262 		wxString cmd = cmds[i];
263 #ifdef __WXMSW__
264 		if (cmd.BeforeFirst(wxT(' ')).Find(wxT(':')) == -1)
265 		cmd = wxGetAppPath() + cmd;
266 #elif __WXMAC__
267 		if (cmd.length() > 0 && cmd[0] != wxT('/'))
268 		cmd = wxGetAppPath() + cmd;
269 #endif
270 		pid[i] = wxExecute(cmd, wxEXEC_ASYNC, proc[i]);
271 		if (pid[i] <= 0)
272 			return false;
273 	}
274 	bool hasInput = true;
275 	while (proc[cnt - 1]->IsRunning() || hasInput) {
276 		wxYield();
277 		if (IsCanceled()) {
278 			for (int i = 0; i < cnt; i++)
279 				wxProcess::Kill(pid[i], wxSIGTERM, wxKILL_CHILDREN);
280 			wxMilliSleep(200);
281 			for (int i = 0; i < cnt; i++)
282 				wxProcess::Kill(pid[i], wxSIGKILL, wxKILL_CHILDREN);
283 			break;
284 		}
285 		hasInput = false;
286 		for (int i = 0; i < cnt; i++)
287 			hasInput = hasInput || proc[i]->HasInput();
288 		if (!hasInput)
289 			wxMilliSleep(50);
290 	}
291 	bool res = proc[cnt - 1]->GetExitCode() == 0;
292 	for (int i = 0; i < cnt; i++)
293 		if (IsCanceled())
294 			proc[i]->Detach();
295 		else
296 			delete proc[i];
297 	if (input)
298 		delete input;
299 	if (output)
300 		delete output;
301 
302 	return res;
303 }
304