1 /*
2 This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3
4 Author: Oliver Heidmann
5
6 */
7
8 #include "processManager.h"
9 #include "process.h"
10 #include "cdo_output.h"
11 #include "cdo_options.h"
12 #include "mpmo_color.h"
13 #include "fileStream.h"
14
15 #include <stack>
16 #include <mutex>
17
18 static std::mutex processMutex;
19
20 static std::string parse_err_msg = "";
21
22 void
run_processes()23 ProcessManager::run_processes()
24 {
25 for (auto &idProcessPair : m_processes)
26 {
27 if (idProcessPair.first)
28 {
29 /*TEMP*/
30 if (!Options::silentMode)
31 {
32 MpMO::Print(Green("%s: ") + "Process started", idProcessPair.second->prompt);
33 }
34 m_threadIDs.push_back(idProcessPair.second->run());
35 }
36 }
37 m_threadIDs.push_back(pthread_self());
38 // MpMO::PrintCerr(Green("%s: ") + "xProcess started", get_process_from_id(0).inq_promt());
39 get_process_from_id(0)->m_module.func(get_process_from_id(0).get());
40 }
41
42 void
kill_processes()43 ProcessManager::kill_processes()
44 {
45 for (auto threadID : m_threadIDs)
46 {
47 if (threadID != pthread_self())
48 {
49 pthread_cancel(threadID);
50 Debug(PROCESS_MANAGER, "process killed: %d", threadID);
51 }
52 }
53 }
54
55 int
get_num_variable_input_operators()56 ProcessManager::get_num_variable_input_operators()
57 {
58 int num = 0;
59 for (auto &process : m_processes)
60 {
61 if (process.second->input_is_variable()) num++;
62 }
63 return num;
64 }
65
66 void
validate_processes()67 ProcessManager::validate_processes()
68 {
69 for (auto &process : m_processes)
70 {
71 process.second->check_stream_cnt();
72 auto stat = process.second->m_status;
73 if (stat != ProcessStatus::Ok)
74 {
75 if (stat == ProcessStatus::TooFewStreams || stat == ProcessStatus::TooManyStreams)
76 if (get_num_variable_input_operators() > 1)
77 {
78 cdo_warning("Did you forget to use '[' and/or ']' for multiple variable input operators?");
79 cdo_warning("use option --variableInput, for description");
80 }
81 process.second->handle_process_err();
82 }
83 }
84 }
85
86 void
clear_processes()87 ProcessManager::clear_processes()
88 {
89 Debug(PROCESS_MANAGER, "Deleting Processes");
90 m_processes.clear();
91 m_numProcesses = 0;
92 m_numProcessesActive = 0;
93 }
94
95 const std::shared_ptr<Process>
create_process_from_command(const std::string & command)96 ProcessManager::create_process_from_command(const std::string &command)
97 {
98 Debug(PROCESS_MANAGER, "Creating new process for command: %s with ID: %d ", command, m_numProcesses);
99 const auto processID = m_numProcesses++;
100
101 const auto operatorName = extract_operator_name(command);
102 const auto arguments = get_operator_argv(command);
103 auto success = m_processes.insert(std::make_pair(processID, std::make_shared<Process>(processID, operatorName, arguments)));
104 if (!success.second) cdo_abort("Process %d could not be created", processID);
105
106 m_numProcessesActive++;
107 Debug(PROCESS_MANAGER, "m_numProcessesActive: %d", m_numProcessesActive);
108
109 if (processID >= MAX_PROCESS) cdo_abort("Limit of %d processes reached!", MAX_PROCESS);
110
111 return success.first->second;
112 }
113
114 int
get_num_processes(void)115 ProcessManager::get_num_processes(void)
116 {
117 std::unique_lock<std::mutex> locked_mutex(processMutex);
118 int pnums = m_processes.size();
119 return pnums;
120 }
121
122 int
get_num_active_processes(void)123 ProcessManager::get_num_active_processes(void)
124 {
125 std::unique_lock<std::mutex> locked_mutex(processMutex);
126 int pnums = m_numProcessesActive;
127 return pnums;
128 }
129
130 /**
131 * Handles process Creation for cdo command p_argvEntry
132 * Adds \p p_parentProcess as parent to the newly created process.
133 * Adds newly created process to \p p_parentProcess.
134 * Also checks if \p p_parentProcess accepts another processes streams
135 * as input and exits with error message if not.
136 */
137 const std::shared_ptr<Process>
create_child_process_for(const std::shared_ptr<Process> & p_parentProcess,const std::string & argvEntry)138 ProcessManager::create_child_process_for(const std::shared_ptr<Process> &p_parentProcess, const std::string &argvEntry)
139 {
140 if (p_parentProcess->m_module.restrictions == FilesOnly)
141 cdo_abort("operator -%s can not be used with pipes.", p_parentProcess->operatorName);
142
143 const std::shared_ptr<Process> &newProcess = create_process_from_command(argvEntry);
144
145 if (newProcess->m_module.streamOutCnt == 0)
146 cdo_abort("operator -%s can not take -%s with 0 outputs as input", p_parentProcess->operatorName, newProcess->operatorName);
147
148 Debug(PROCESS_MANAGER, "Adding new child %s (id: %d) to %s (idL %d)", newProcess->operatorName, newProcess->get_id(), p_parentProcess->operatorName, p_parentProcess->get_id());
149
150 p_parentProcess->add_child(newProcess);
151 newProcess->add_parent(p_parentProcess);
152
153 return newProcess;
154 }
155
156 std::set<std::string>
handle_first_operator(int p_argcStart,int argc,const std::vector<std::string> & argv,const std::shared_ptr<Process> & p_rootProcess)157 ProcessManager::handle_first_operator(int p_argcStart, int argc,const std::vector<std::string> &argv, const std::shared_ptr<Process> &p_rootProcess)
158 {
159 std::set<std::string> files;
160 for (int i = p_argcStart; i < argc; i++)
161 {
162 Debug(PROCESS_MANAGER, "Creating new pstream for output file: %s", argv[i]);
163 if (argv[i] == "]")
164 {
165 cdo_abort("missing output file");
166 }
167 if (argv[i][0] == '-')
168 {
169 cdo_abort("missing Output file, did you add a '-' to a file?");
170 }
171 p_rootProcess->add_file_out_stream(argv[i]);
172 files.insert(argv[i]);
173 }
174 return files;
175 }
176
177 void
check_single_bracket_only(const char * p_argvEntry,char p_bracketType)178 ProcessManager::check_single_bracket_only(const char *p_argvEntry, char p_bracketType)
179 {
180 if (strlen(p_argvEntry) != 1)
181 {
182 cdo_abort("Only single %c allowed", p_bracketType);
183 }
184 }
185
186 void
create_processes(int argc,const std::vector<std::string> & argv)187 ProcessManager::create_processes(int argc,const std::vector<std::string> &argv)
188 {
189 ParseStatus parseStatus = create_processes_from_input(argc, argv);
190 if (parseStatus != ParseStatus::Ok)
191 {
192 // Program Exits here
193 handle_parse_error(parseStatus);
194 }
195 validate_processes();
196 }
197
198 #include <algorithm>
199 static std::string
get_stack_as_string(std::stack<std::shared_ptr<Process>> p_stack)200 get_stack_as_string(std::stack<std::shared_ptr<Process>> p_stack)
201 {
202 std::string stackString = "";
203 std::vector<std::string> strVec;
204 while (!p_stack.empty())
205 {
206 strVec.push_back(p_stack.top()->operatorName);
207 p_stack.pop();
208 }
209 std::reverse(strVec.begin(), strVec.end());
210 for (auto strEle : strVec)
211 {
212 stackString += " " + strEle;
213 }
214 return stackString;
215 }
216
217 /* comment FOR DEVELOPERS ONLY (do not include in docu)
218 *
219 * This is the so to speak parser for cdo console inputs.
220 *
221 * This parser runs over every argv that comes after the cdo options. The
222 * fist thing that is done is processing the first operator, since this
223 * operator is the only one that has output files we can remove the output
224 * file from out argv by limiting the argc. Obase operators are treated as if
225 * they have a single output since the operator itself takes care of writing
226 * and creating the files it needs for its output. We also create the first
227 * process for the operator and push it on out stack. Our stack will contain
228 * all operators that do not have all in- or output they need. After the
229 * first operator is handled the parser will go over each other element in
230 * argv. Here 4 cases can happen. Only one of the 4 will happen in one
231 * iteration.
232 *
233 * If an operator is found we create a new process and add this process as
234 * child to the process on top of the stack. Likewise we add the top process
235 * as parent to the new process. Then the new Process is added to the stack.
236 *
237 * Does the argv element represent a file (indicated by the missing '-' in
238 * argv[i][0]) we create a file stream and add it to the process at the top of
239 * the stack.
240 *
241 * In case of a '[' or ']' we check if there is only one bracket since we
242 * decided to not allow multiple brackets in the same argv entry.Then we add
243 * ('[') or remove (']') the top of the process stack to a set (named
244 * bracketOperators) which will keep track of which operators used a bracket.
245 * This stack allows to 'mark' an operator so that it is only removed in case
246 * of a ']'. The ']' indicates that the top process should be removed.and
247 * that it SHOULD have the correct number of inputs.
248 *
249 * At the end of each iteration we remove all operators that have all their
250 * inputs AND are not contained in out bracket operators set. So a not closed
251 * bracket will cause a wanted miss function of the parser as the operator
252 * will not be removed and more inputs will be added. This will be found later
253 * by our routine (Process::validate) that checks if every process has the
254 * correct number of inputs and outputs and will throw an error.
255 */
256 std::vector<std::string>
get_operator_argv(std::string operatorArguments)257 ProcessManager::get_operator_argv(std::string operatorArguments)
258 {
259 std::vector<std::string> argument_vector;
260 Debug(PROCESS && strchr(operatorArguments.c_str(), ',') != nullptr, "Setting operator arguments: %s", operatorArguments);
261
262 constexpr char delimiter = ',';
263
264 auto pos = operatorArguments.find(delimiter);
265 if (pos != std::string::npos)
266 {
267 // remove operator name
268 operatorArguments.erase(0, pos + 1);
269
270 while ((pos = operatorArguments.find(delimiter)) != std::string::npos)
271 {
272 argument_vector.push_back(operatorArguments.substr(0, pos));
273 Debug("added argument %s", argument_vector.back());
274 operatorArguments.erase(0, pos + 1);
275 }
276 argument_vector.push_back(operatorArguments);
277 }
278 return argument_vector;
279 }
280
281
282 ParseStatus
create_processes_from_input(int argc,const std::vector<std::string> & argv)283 ProcessManager::create_processes_from_input(int argc, const std::vector<std::string> &argv)
284 {
285 Debug(PROCESS_MANAGER, "== Process Creation Start ==");
286 Debug(PROCESS_MANAGER, "operators: %s", argv_to_string(argv));
287
288 const std::shared_ptr<Process> &root_process = create_process_from_command(argv[0]);
289 int cntOutFiles = (int) root_process->m_module.streamOutCnt;
290
291 std::set<std::string> files;
292
293 unsigned int lastNonOutputIdx = argc - cntOutFiles;
294 if (cntOutFiles == -1)
295 {
296 root_process->set_obase(argv[argc - 1]);
297 cntOutFiles = 1;
298 lastNonOutputIdx = argc - 1;
299 }
300 else
301 {
302 if (lastNonOutputIdx <= 0) return ParseStatus::MissingOutFile;
303 files = handle_first_operator(lastNonOutputIdx, argc, argv, root_process);
304 }
305
306 std::shared_ptr<Process> currentProcess;
307 std::stack<std::shared_ptr<Process>> processStack;
308 std::vector<std::shared_ptr<Process>> bracketOperators;
309 const char *argvEntry;
310 int unclosedBrackets = 0;
311 unsigned int idx = 1;
312
313 processStack.push(root_process);
314 while (!processStack.empty() && idx < lastNonOutputIdx)
315 {
316 Debug(PROCESS_MANAGER, "%s", get_stack_as_string(processStack));
317 currentProcess = processStack.top();
318 Debug(PROCESS_MANAGER, "iteration %d, current argv: %s, currentProcess: %s", idx, argv[idx], currentProcess->m_operatorCommand);
319
320 argvEntry = argv[idx].c_str();
321 //------------------------------------------------------
322 // case: operator
323 if (argvEntry[0] == '-')
324 {
325 Debug(PROCESS_MANAGER, "Found new Operator: %s", argvEntry);
326 currentProcess = create_child_process_for(currentProcess, argvEntry);
327 if (currentProcess->m_module.restrictions == OnlyFirst)
328 {
329 parse_err_msg += "Operator " + std::string(argvEntry) + " can only be used if it is first in the operator chain";
330 return ParseStatus::OperatorNotFirst;
331 }
332 processStack.push(currentProcess);
333 }
334 // - - - - - - - - - - - - - - - - - - - - - - - - - - -
335 // case: bracket start
336 else if (argvEntry[0] == '[')
337 {
338 check_single_bracket_only(argvEntry, '[');
339 bracketOperators.push_back(currentProcess);
340 unclosedBrackets++;
341 }
342 // - - - - - - - - - - - - - - - - - - - - - - - - - - -
343 // case: bracket end
344 else if (argvEntry[0] == ']')
345 {
346 check_single_bracket_only(argvEntry, ']');
347 unclosedBrackets--;
348 bracketOperators.pop_back();
349 // this check is for double bracktets like -info [ [ file1 file2 file3 ] ] which can happen when cdo wildcards are used.
350 auto it = std::find(bracketOperators.begin(), bracketOperators.end(), processStack.top());
351 if (it == bracketOperators.end()) processStack.pop();
352 }
353 // - - - - - - - - - - - - - - - - - - - - - - - - - - -
354 // case: file
355 else if (currentProcess->m_module.streamInCnt != 0)
356 {
357 Debug(PROCESS_MANAGER, "adding in file to %s", currentProcess->operatorName);
358 if (files.find(argvEntry) == files.end())
359 {
360 currentProcess->add_file_in_stream(argvEntry);
361 }
362 else
363 {
364 parse_err_msg
365 += std::string(argvEntry) + " is used as an output, files can not be used as in- and output at the same time";
366 return ParseStatus::FileIsInAndOutput;
367 }
368 }
369 // -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
370 // remove finished
371 while (!processStack.empty() && processStack.top()->has_hall_inputs()
372 && std::find(bracketOperators.begin(), bracketOperators.end(), processStack.top()) == bracketOperators.end())
373 {
374 Debug(PROCESS_MANAGER, "Removing %s from stack", processStack.top()->operatorName);
375 processStack.pop();
376 }
377 //------------------------------------------------------
378 idx++;
379 }
380 //---------------------------------------------------------------
381 if (unclosedBrackets > 0) return ParseStatus::ClosingBracketMissing;
382 if (unclosedBrackets < 0) return ParseStatus::OpenBracketMissing;
383 if (idx < lastNonOutputIdx)
384 {
385 if (argv[idx][0] == ']') return ParseStatus::OpenBracketMissing;
386 return ParseStatus::UnprocessedInput;
387 }
388
389 Debug(PROCESS_MANAGER, "== Process Creation End ==");
390
391 set_process_num(m_processes.size());
392 FileStream::enableTimers(m_processes.size() == 1 && Threading::ompNumThreads == 1);
393
394 return ParseStatus::Ok;
395 }
396
397 void
handle_parse_error(ParseStatus p_errCode)398 ProcessManager::handle_parse_error(ParseStatus p_errCode)
399 {
400 switch (p_errCode)
401 {
402 case ParseStatus::ClosingBracketMissing:
403 {
404 cdo_abort("Missing ']'.");
405 break;
406 }
407 case ParseStatus::OpenBracketMissing:
408 {
409 cdo_abort("Missing '['.");
410 break;
411 }
412 case ParseStatus::UnprocessedInput:
413 {
414 cdo_abort("Unprocessed Input, could not process all Operators/Files");
415 break;
416 }
417 case ParseStatus::MissingOutFile:
418 {
419 cdo_abort("Missing out file for first operator");
420 break;
421 }
422 case ParseStatus::OperatorNotFirst:
423 {
424 cdo_abort("%s", parse_err_msg);
425 break;
426 }
427 case ParseStatus::MissingObase:
428 {
429 break;
430 }
431 case ParseStatus::FileIsInAndOutput:
432 {
433 cdo_abort("%s", parse_err_msg);
434 }
435 case ParseStatus::Ok:
436 {
437 return;
438 }
439 }
440 }
441
442 const std::shared_ptr<Process> &
get_process_from_id(int p_processID)443 ProcessManager::get_process_from_id(int p_processID)
444 {
445 std::unique_lock<std::mutex> locked_mutex(processMutex);
446
447 const auto process = m_processes.find(p_processID);
448 if (process == m_processes.end()) cdo_abort("Process with ID: %d not found", p_processID);
449
450 return process->second;
451 }
452