1 /*
2   This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3 
4   Author: Oliver Heidmann
5 
6 */
7 
8 #include "processManager.h"
9 #include "process.h"
10 #include "cdo_output.h"
11 #include "cdo_options.h"
12 #include "mpmo_color.h"
13 #include "fileStream.h"
14 
15 #include <stack>
16 #include <mutex>
17 
18 static std::mutex processMutex;
19 
20 static std::string parse_err_msg = "";
21 
22 void
run_processes()23 ProcessManager::run_processes()
24 {
25   for (auto &idProcessPair : m_processes)
26     {
27       if (idProcessPair.first)
28         {
29           /*TEMP*/
30           if (!Options::silentMode)
31             {
32               MpMO::Print(Green("%s: ") + "Process started", idProcessPair.second->prompt);
33             }
34           m_threadIDs.push_back(idProcessPair.second->run());
35         }
36     }
37   m_threadIDs.push_back(pthread_self());
38   // MpMO::PrintCerr(Green("%s: ") + "xProcess started", get_process_from_id(0).inq_promt());
39   get_process_from_id(0)->m_module.func(get_process_from_id(0).get());
40 }
41 
42 void
kill_processes()43 ProcessManager::kill_processes()
44 {
45   for (auto threadID : m_threadIDs)
46     {
47       if (threadID != pthread_self())
48         {
49           pthread_cancel(threadID);
50           Debug(PROCESS_MANAGER, "process killed: %d", threadID);
51         }
52     }
53 }
54 
55 int
get_num_variable_input_operators()56 ProcessManager::get_num_variable_input_operators()
57 {
58   int num = 0;
59   for (auto &process : m_processes)
60     {
61       if (process.second->input_is_variable()) num++;
62     }
63   return num;
64 }
65 
66 void
validate_processes()67 ProcessManager::validate_processes()
68 {
69   for (auto &process : m_processes)
70     {
71       process.second->check_stream_cnt();
72       auto stat = process.second->m_status;
73       if (stat != ProcessStatus::Ok)
74         {
75           if (stat == ProcessStatus::TooFewStreams || stat == ProcessStatus::TooManyStreams)
76             if (get_num_variable_input_operators() > 1)
77               {
78                 cdo_warning("Did you forget to use '[' and/or ']' for multiple variable input operators?");
79                 cdo_warning("use option --variableInput, for description");
80               }
81           process.second->handle_process_err();
82         }
83     }
84 }
85 
86 void
clear_processes()87 ProcessManager::clear_processes()
88 {
89   Debug(PROCESS_MANAGER, "Deleting Processes");
90   m_processes.clear();
91   m_numProcesses = 0;
92   m_numProcessesActive = 0;
93 }
94 
95 const std::shared_ptr<Process>
create_process_from_command(const std::string & command)96 ProcessManager::create_process_from_command(const std::string &command)
97 {
98   Debug(PROCESS_MANAGER, "Creating new process for command: %s with ID: %d ", command, m_numProcesses);
99   const auto processID = m_numProcesses++;
100 
101   const auto operatorName = extract_operator_name(command);
102   const auto arguments = get_operator_argv(command);
103   auto success = m_processes.insert(std::make_pair(processID, std::make_shared<Process>(processID, operatorName, arguments)));
104   if (!success.second) cdo_abort("Process %d could not be created", processID);
105 
106   m_numProcessesActive++;
107   Debug(PROCESS_MANAGER, "m_numProcessesActive: %d", m_numProcessesActive);
108 
109   if (processID >= MAX_PROCESS) cdo_abort("Limit of %d processes reached!", MAX_PROCESS);
110 
111   return success.first->second;
112 }
113 
114 int
get_num_processes(void)115 ProcessManager::get_num_processes(void)
116 {
117   std::unique_lock<std::mutex> locked_mutex(processMutex);
118   int pnums = m_processes.size();
119   return pnums;
120 }
121 
122 int
get_num_active_processes(void)123 ProcessManager::get_num_active_processes(void)
124 {
125   std::unique_lock<std::mutex> locked_mutex(processMutex);
126   int pnums = m_numProcessesActive;
127   return pnums;
128 }
129 
130 /**
131  * Handles process Creation for cdo command p_argvEntry
132  * Adds \p p_parentProcess as parent to the newly created process.
133  * Adds newly created process to \p p_parentProcess.
134  * Also checks if \p p_parentProcess accepts another processes streams
135  * as input and exits with error message if not.
136  */
137 const std::shared_ptr<Process>
create_child_process_for(const std::shared_ptr<Process> & p_parentProcess,const std::string & argvEntry)138 ProcessManager::create_child_process_for(const std::shared_ptr<Process> &p_parentProcess, const std::string &argvEntry)
139 {
140   if (p_parentProcess->m_module.restrictions == FilesOnly)
141     cdo_abort("operator -%s can not be used with pipes.", p_parentProcess->operatorName);
142 
143   const std::shared_ptr<Process> &newProcess = create_process_from_command(argvEntry);
144 
145   if (newProcess->m_module.streamOutCnt == 0)
146     cdo_abort("operator -%s can not take -%s  with 0 outputs as input", p_parentProcess->operatorName, newProcess->operatorName);
147 
148   Debug(PROCESS_MANAGER, "Adding new child %s (id: %d) to %s (idL %d)", newProcess->operatorName, newProcess->get_id(), p_parentProcess->operatorName, p_parentProcess->get_id());
149 
150   p_parentProcess->add_child(newProcess);
151   newProcess->add_parent(p_parentProcess);
152 
153   return newProcess;
154 }
155 
156 std::set<std::string>
handle_first_operator(int p_argcStart,int argc,const std::vector<std::string> & argv,const std::shared_ptr<Process> & p_rootProcess)157 ProcessManager::handle_first_operator(int p_argcStart, int argc,const std::vector<std::string> &argv, const std::shared_ptr<Process> &p_rootProcess)
158 {
159   std::set<std::string> files;
160   for (int i = p_argcStart; i < argc; i++)
161     {
162       Debug(PROCESS_MANAGER, "Creating new pstream for output file: %s", argv[i]);
163       if (argv[i] == "]")
164         {
165           cdo_abort("missing output file");
166         }
167       if (argv[i][0] == '-')
168         {
169           cdo_abort("missing Output file, did you add a '-' to a file?");
170         }
171       p_rootProcess->add_file_out_stream(argv[i]);
172       files.insert(argv[i]);
173     }
174   return files;
175 }
176 
177 void
check_single_bracket_only(const char * p_argvEntry,char p_bracketType)178 ProcessManager::check_single_bracket_only(const char *p_argvEntry, char p_bracketType)
179 {
180   if (strlen(p_argvEntry) != 1)
181     {
182       cdo_abort("Only single %c allowed", p_bracketType);
183     }
184 }
185 
186 void
create_processes(int argc,const std::vector<std::string> & argv)187 ProcessManager::create_processes(int argc,const std::vector<std::string> &argv)
188 {
189   ParseStatus parseStatus = create_processes_from_input(argc, argv);
190   if (parseStatus != ParseStatus::Ok)
191     {
192       // Program Exits here
193       handle_parse_error(parseStatus);
194     }
195   validate_processes();
196 }
197 
198 #include <algorithm>
199 static std::string
get_stack_as_string(std::stack<std::shared_ptr<Process>> p_stack)200 get_stack_as_string(std::stack<std::shared_ptr<Process>> p_stack)
201 {
202   std::string stackString = "";
203   std::vector<std::string> strVec;
204   while (!p_stack.empty())
205     {
206       strVec.push_back(p_stack.top()->operatorName);
207       p_stack.pop();
208     }
209   std::reverse(strVec.begin(), strVec.end());
210   for (auto strEle : strVec)
211     {
212       stackString += " " + strEle;
213     }
214   return stackString;
215 }
216 
217 /* comment FOR DEVELOPERS ONLY (do not include in docu)
218  *
219  * This is the so to speak parser for cdo console inputs.
220  *
221  *  This parser runs over every argv that comes after the cdo options.  The
222  *  fist thing that is done is processing the first operator, since this
223  *  operator is the only one that has output files we can remove the output
224  *  file from out argv by limiting the argc. Obase operators are treated as if
225  *  they have a single output since the operator itself takes care of writing
226  *  and creating the files it needs for its output. We also create the first
227  *  process for the operator and push it on out stack.  Our stack will contain
228  *  all operators that do not have all in- or output they need.  After the
229  *  first operator is handled the parser will go over each other element in
230  *  argv.  Here 4 cases can happen. Only one of the 4 will happen in one
231  *  iteration.
232  *
233  *  If an operator is found we create a new process and add this process as
234  *  child to the process on top of the stack. Likewise we add the top process
235  *  as parent to the new process. Then the new Process is added to the stack.
236  *
237  *  Does the argv element represent a file (indicated by the missing '-' in
238  *  argv[i][0]) we create a file stream and add it to the process at the top of
239  *  the stack.
240  *
241  *  In case of a '[' or ']' we check if there is only one bracket since we
242  *  decided to not allow multiple brackets in the same argv entry.Then we add
243  *  ('[') or remove (']') the top of the process stack to a set (named
244  *  bracketOperators) which will keep track of which operators used a bracket.
245  *  This stack allows to 'mark' an operator so that it is only removed in case
246  *  of a ']'.  The ']' indicates that the top process should be removed.and
247  *  that it SHOULD have the correct number of inputs.
248  *
249  *  At the end of each iteration we remove all operators that have all their
250  *  inputs AND are not contained in out bracket operators set. So a not closed
251  *  bracket will cause a wanted miss function of the parser as the operator
252  *  will not be removed and more inputs will be added. This will be found later
253  *  by our routine (Process::validate) that checks if every process has the
254  *  correct number of inputs and outputs and will throw an error.
255  */
256 std::vector<std::string>
get_operator_argv(std::string operatorArguments)257 ProcessManager::get_operator_argv(std::string operatorArguments)
258 {
259   std::vector<std::string> argument_vector;
260   Debug(PROCESS && strchr(operatorArguments.c_str(), ',') != nullptr, "Setting operator arguments: %s", operatorArguments);
261 
262   constexpr char delimiter = ',';
263 
264   auto pos = operatorArguments.find(delimiter);
265   if (pos != std::string::npos)
266     {
267       // remove operator name
268       operatorArguments.erase(0, pos + 1);
269 
270       while ((pos = operatorArguments.find(delimiter)) != std::string::npos)
271         {
272           argument_vector.push_back(operatorArguments.substr(0, pos));
273           Debug("added argument %s", argument_vector.back());
274           operatorArguments.erase(0, pos + 1);
275         }
276       argument_vector.push_back(operatorArguments);
277     }
278   return argument_vector;
279 }
280 
281 
282 ParseStatus
create_processes_from_input(int argc,const std::vector<std::string> & argv)283 ProcessManager::create_processes_from_input(int argc, const std::vector<std::string> &argv)
284 {
285   Debug(PROCESS_MANAGER, "== Process Creation Start ==");
286   Debug(PROCESS_MANAGER, "operators: %s", argv_to_string(argv));
287 
288   const std::shared_ptr<Process> &root_process = create_process_from_command(argv[0]);
289   int cntOutFiles = (int) root_process->m_module.streamOutCnt;
290 
291   std::set<std::string> files;
292 
293   unsigned int lastNonOutputIdx = argc - cntOutFiles;
294   if (cntOutFiles == -1)
295     {
296       root_process->set_obase(argv[argc - 1]);
297       cntOutFiles = 1;
298       lastNonOutputIdx = argc - 1;
299     }
300   else
301     {
302       if (lastNonOutputIdx <= 0) return ParseStatus::MissingOutFile;
303       files = handle_first_operator(lastNonOutputIdx, argc, argv, root_process);
304     }
305 
306   std::shared_ptr<Process> currentProcess;
307   std::stack<std::shared_ptr<Process>> processStack;
308   std::vector<std::shared_ptr<Process>> bracketOperators;
309   const char *argvEntry;
310   int unclosedBrackets = 0;
311   unsigned int idx = 1;
312 
313   processStack.push(root_process);
314   while (!processStack.empty() && idx < lastNonOutputIdx)
315     {
316       Debug(PROCESS_MANAGER, "%s", get_stack_as_string(processStack));
317       currentProcess = processStack.top();
318       Debug(PROCESS_MANAGER, "iteration %d, current argv: %s, currentProcess: %s", idx, argv[idx], currentProcess->m_operatorCommand);
319 
320       argvEntry = argv[idx].c_str();
321       //------------------------------------------------------
322       // case: operator
323       if (argvEntry[0] == '-')
324         {
325           Debug(PROCESS_MANAGER, "Found new Operator: %s", argvEntry);
326           currentProcess = create_child_process_for(currentProcess, argvEntry);
327           if (currentProcess->m_module.restrictions == OnlyFirst)
328             {
329               parse_err_msg += "Operator " + std::string(argvEntry) + " can only be used if it is first in the operator chain";
330               return ParseStatus::OperatorNotFirst;
331             }
332           processStack.push(currentProcess);
333         }
334       // - - - - - - - - - - - - - - - - - - - - - - - - - - -
335       // case: bracket start
336       else if (argvEntry[0] == '[')
337         {
338           check_single_bracket_only(argvEntry, '[');
339           bracketOperators.push_back(currentProcess);
340           unclosedBrackets++;
341         }
342       // - - - - - - - - - - - - - - - - - - - - - - - - - - -
343       // case: bracket end
344       else if (argvEntry[0] == ']')
345         {
346           check_single_bracket_only(argvEntry, ']');
347           unclosedBrackets--;
348           bracketOperators.pop_back();
349           // this check is for double bracktets like -info [ [ file1 file2 file3 ] ]  which can happen when cdo wildcards are used.
350           auto it = std::find(bracketOperators.begin(), bracketOperators.end(), processStack.top());
351           if (it == bracketOperators.end()) processStack.pop();
352         }
353       // - - - - - - - - - - - - - - - - - - - - - - - - - - -
354       // case: file
355       else if (currentProcess->m_module.streamInCnt != 0)
356         {
357           Debug(PROCESS_MANAGER, "adding in file to %s", currentProcess->operatorName);
358           if (files.find(argvEntry) == files.end())
359             {
360               currentProcess->add_file_in_stream(argvEntry);
361             }
362           else
363             {
364               parse_err_msg
365                   += std::string(argvEntry) + " is used as an output, files can not be used as in- and output at the same time";
366               return ParseStatus::FileIsInAndOutput;
367             }
368         }
369       // -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
370       // remove finished
371       while (!processStack.empty() && processStack.top()->has_hall_inputs()
372              && std::find(bracketOperators.begin(), bracketOperators.end(), processStack.top()) == bracketOperators.end())
373         {
374           Debug(PROCESS_MANAGER, "Removing %s from stack", processStack.top()->operatorName);
375           processStack.pop();
376         }
377       //------------------------------------------------------
378       idx++;
379     }
380   //---------------------------------------------------------------
381   if (unclosedBrackets > 0) return ParseStatus::ClosingBracketMissing;
382   if (unclosedBrackets < 0) return ParseStatus::OpenBracketMissing;
383   if (idx < lastNonOutputIdx)
384     {
385       if (argv[idx][0] == ']') return ParseStatus::OpenBracketMissing;
386       return ParseStatus::UnprocessedInput;
387     }
388 
389   Debug(PROCESS_MANAGER, "== Process Creation End ==");
390 
391   set_process_num(m_processes.size());
392   FileStream::enableTimers(m_processes.size() == 1 && Threading::ompNumThreads == 1);
393 
394   return ParseStatus::Ok;
395 }
396 
397 void
handle_parse_error(ParseStatus p_errCode)398 ProcessManager::handle_parse_error(ParseStatus p_errCode)
399 {
400   switch (p_errCode)
401     {
402     case ParseStatus::ClosingBracketMissing:
403       {
404         cdo_abort("Missing ']'.");
405         break;
406       }
407     case ParseStatus::OpenBracketMissing:
408       {
409         cdo_abort("Missing '['.");
410         break;
411       }
412     case ParseStatus::UnprocessedInput:
413       {
414         cdo_abort("Unprocessed Input, could not process all Operators/Files");
415         break;
416       }
417     case ParseStatus::MissingOutFile:
418       {
419         cdo_abort("Missing out file for first operator");
420         break;
421       }
422     case ParseStatus::OperatorNotFirst:
423       {
424         cdo_abort("%s", parse_err_msg);
425         break;
426       }
427     case ParseStatus::MissingObase:
428       {
429         break;
430       }
431     case ParseStatus::FileIsInAndOutput:
432       {
433         cdo_abort("%s", parse_err_msg);
434       }
435     case ParseStatus::Ok:
436       {
437         return;
438       }
439     }
440 }
441 
442 const std::shared_ptr<Process> &
get_process_from_id(int p_processID)443 ProcessManager::get_process_from_id(int p_processID)
444 {
445   std::unique_lock<std::mutex> locked_mutex(processMutex);
446 
447   const auto process = m_processes.find(p_processID);
448   if (process == m_processes.end()) cdo_abort("Process with ID: %d not found", p_processID);
449 
450   return process->second;
451 }
452