1 /*
2   This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3 
4   Author: Uwe Schulzweida
5           Oliver Heidmann
6 
7 */
8 
9 #ifdef HAVE_CONFIG_H
10 #include "config.h"
11 #endif
12 
13 #ifdef HAVE_PTHREAD_H
14 #include <pthread.h>
15 #endif
16 
17 #ifdef _OPENMP
18 #include <omp.h>
19 #endif
20 
21 #include <stdio.h>
22 #include <iostream>
23 #include <string>
24 #include <sstream>
25 
26 #include "process.h"
27 #include "cdo_wtime.h"
28 #include "util_string.h"
29 #include "cdo_options.h"
30 #include "fileStream.h"
31 #include "pipeStream.h"
32 
33 static int processNum = 0;
34 
35 int
get_process_num()36 get_process_num()
37 {
38   return processNum;
39 }
40 
41 void
set_process_num(int p_num)42 set_process_num(int p_num)
43 {
44   processNum = p_num;
45 }
46 
Process(int p_ID,const std::string & p_operatorName,const std::vector<std::string> & p_arguments)47 Process::Process(int p_ID, const std::string &p_operatorName, const std::vector<std::string> &p_arguments)
48     : m_ID(p_ID), operatorName(p_operatorName)
49 {
50   m_isActive = true;
51   init_process(p_operatorName, p_arguments);
52 }
53 
54 bool
input_is_variable()55 Process::input_is_variable()
56 {
57   return m_module.streamInCnt == -1;
58 }
59 
60 void
init_process(const std::string & p_operatorName,const std::vector<std::string> & p_arguments)61 Process::init_process(const std::string &p_operatorName, const std::vector<std::string> &p_arguments)
62 {
63   startTime = cdo_get_wtime();
64 #ifdef HAVE_LIBPTHREAD
65   threadID = pthread_self();
66 #endif
67   m_oargv = p_arguments;
68   operatorName = get_original(p_operatorName);
69 
70   m_module = get_module(p_operatorName);
71 
72   def_prompt();  // has to be called after get operatorName
73 }
74 
75 int
get_stream_cnt_in()76 Process::get_stream_cnt_in()
77 {
78   return inputStreams.size();
79 }
80 
81 int
get_stream_cnt_out()82 Process::get_stream_cnt_out()
83 {
84   return outputStreams.size();
85 }
86 
87 void
def_prompt()88 Process::def_prompt()
89 {
90   if (m_ID == 0)
91     snprintf(prompt, sizeof(prompt), "%s    %s", cdo::progname, operatorName.c_str());
92   else
93     snprintf(prompt, sizeof(prompt), "%s(%d) %s", cdo::progname, m_ID, operatorName.c_str());
94 }
95 
96 const char *
inq_promt() const97 Process::inq_promt() const
98 {
99   return prompt;
100 }
101 
102 void
handle_process_err()103 Process::handle_process_err()
104 {
105   switch (m_status)
106     {
107     case ProcessStatus::UnlimitedIOCounts:
108       {
109         cdo_abort("I/O stream counts unlimited no allowed!");
110         break;
111       }
112     case ProcessStatus::MissInput:
113       {
114         cdo_abort("Input streams missing!");
115         break;
116       }
117     case ProcessStatus::MissOutput:
118       {
119         cdo_abort("Output streams missing!");
120         break;
121       }
122     case ProcessStatus::TooManyStreams:
123     case ProcessStatus::TooFewStreams:
124       {
125         const auto inCnt = m_module.streamInCnt;
126         auto outCnt = m_module.streamOutCnt;
127         const bool lobase = outCnt == -1;
128         if (lobase) outCnt = 1;
129 
130         std::string caseCount = (m_status == ProcessStatus::TooManyStreams) ? "many" : "few";
131         std::string pluralIn = (inCnt > 1) ? "s" : "";
132         std::string pluralOut = (outCnt > 1) ? "s" : "";
133 
134         std::stringstream errMsg;
135         errMsg << "Too " << caseCount << " streams specified! Operator " << operatorName << " needs " << inCnt << " input stream"
136                << pluralIn << " and " << outCnt << " output " << (lobase ? "basename" : "stream") << pluralOut << "!";
137         cdo_abort(errMsg.str());
138         break;
139       }
140     case ProcessStatus::Ok: break;
141     }
142 }
143 
144 void
validate()145 Process::validate()
146 {
147   check_stream_cnt();
148   if (m_status != ProcessStatus::Ok) handle_process_err();
149 }
150 
151 void
check_stream_cnt()152 Process::check_stream_cnt()
153 {
154   int streamCnt = 0;
155 
156   int wantedStreamInCnt = m_module.streamInCnt;
157   int wantedStreamOutCnt = m_module.streamOutCnt;
158 
159   int streamInCnt0 = wantedStreamInCnt;
160 
161   bool obase = false;
162   if (wantedStreamOutCnt == -1)
163     {
164       wantedStreamOutCnt = 1;
165       obase = true;
166     }
167 
168   if (wantedStreamInCnt == -1 && wantedStreamOutCnt == -1)
169     {
170       m_status = ProcessStatus::UnlimitedIOCounts;
171     }
172 
173   // printf(" wantedStreamInCnt,wantedStreamOutCnt %d %d\n",
174   // wantedStreamInCnt,wantedStreamOutCnt);
175   else if (wantedStreamInCnt == -1)
176     {
177       wantedStreamInCnt = m_streamCnt - wantedStreamOutCnt;
178       if (wantedStreamInCnt < 1) m_status = ProcessStatus::MissInput;
179     }
180 
181   else if (wantedStreamOutCnt == -1)
182     {
183       wantedStreamOutCnt = m_streamCnt - wantedStreamInCnt;
184       if (wantedStreamOutCnt < 1) m_status = ProcessStatus::MissOutput;
185     }
186   else
187     {
188       // printf(" wantedStreamInCnt,wantedStreamOutCnt %d %d\n",
189       // wantedStreamInCnt,wantedStreamOutCnt);
190 
191       streamCnt = wantedStreamInCnt + wantedStreamOutCnt;
192       // printf(" streamCnt %d %d\n", m_streamCnt, streamCnt);
193 
194       if (m_streamCnt > streamCnt)
195         m_status = ProcessStatus::TooManyStreams;
196       else if (m_streamCnt < streamCnt && !obase)
197         m_status = ProcessStatus::TooFewStreams;
198       else if (wantedStreamInCnt > (int) inputStreams.size())
199         m_status = ProcessStatus::TooFewStreams;
200 
201       else if (wantedStreamInCnt == 1 && streamInCnt0 == -1)
202         m_status = ProcessStatus::Ok;
203     }
204 }
205 
206 bool
has_hall_inputs()207 Process::has_hall_inputs()
208 {
209   if (m_module.streamInCnt == -1) return false;
210 
211   return (m_module.streamInCnt == static_cast<short>(inputStreams.size()));
212 }
213 
214 void
set_inactive()215 Process::set_inactive()
216 {
217   m_isActive = false;
218 }
219 
220 int
operator_add(const char * name,int f1,int f2,const char * enter)221 Process::operator_add(const char *name, int f1, int f2, const char *enter)
222 {
223   const int operID = m_noper;
224 
225   if (operID >= MAX_OPERATOR) cdo_abort("Maximum number of %d operators reached!", MAX_OPERATOR);
226 
227   oper[m_noper] = { f1, f2, name, enter };
228 
229   m_noper++;
230 
231   return operID;
232 }
233 
234 int
get_operator_id()235 Process::get_operator_id()
236 {
237   if (m_noper > 0)
238     {
239       for (int operID = 0; operID < m_noper; operID++)
240         {
241           if (operatorName == oper[operID].name) return operID;
242         }
243 
244       cdo_abort("Operator not callable by this name! Name is: %s", operatorName);
245     }
246 
247   cdo_abort("Operator not initialized!");
248 
249   return -1;
250 }
251 
252 void
add_file_in_stream(const std::string & file)253 Process::add_file_in_stream(const std::string &file)
254 {
255   inputStreams.push_back(std::make_shared<FileStream>(file));
256   m_streamCnt++;
257 }
258 
259 void
add_file_out_stream(const std::string & file)260 Process::add_file_out_stream(const std::string &file)
261 {
262   if (file[0] == '-')
263     {
264       cdo_abort("Missing output file. Found an operator instead of filename: %s", file);
265     }
266   outputStreams.push_back(std::make_shared<FileStream>(file));
267   m_streamCnt++;
268 }
269 
270 void
add_child(const std::shared_ptr<Process> & childProcess)271 Process::add_child(const std::shared_ptr<Process> &childProcess)
272 {
273   childProcesses.push_back(childProcess);
274   nchild = childProcesses.size();
275   add_pipe_in_stream();
276 }
277 
278 void
add_pipe_in_stream()279 Process::add_pipe_in_stream()
280 {
281 #ifdef HAVE_LIBPTHREAD
282   inputStreams.push_back(std::make_shared<PipeStream>(m_ID));
283   m_streamCnt++;
284 #else
285   cdo_abort("Cannot use pipes, pthread support not compiled in!");
286 #endif
287 }
288 
289 void
add_parent(const std::shared_ptr<Process> & parentProcess)290 Process::add_parent(const std::shared_ptr<Process> &parentProcess)
291 {
292   parentProcesses.push_back(parentProcess);
293   m_posInParent = parentProcess->inputStreams.size() - 1;
294   add_pipe_out_stream();
295 }
296 
297 void
add_pipe_out_stream()298 Process::add_pipe_out_stream()
299 {
300   outputStreams.push_back(parentProcesses[0]->inputStreams[m_posInParent]);
301   m_streamCnt++;
302 }
303 
304 pthread_t
run()305 Process::run()
306 {
307   Debug(PROCESS, "starting new thread for process %d", m_ID);
308   pthread_attr_t attr;
309   auto status = pthread_attr_init(&attr);
310   if (status) cdo_sys_error("pthread_attr_init failed for '%s'", operatorName.c_str());
311   status = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
312   if (status) cdo_sys_error("pthread_attr_setdetachstate failed for '%s'", operatorName.c_str());
313   /*
314     param.sched_priority = 0;
315     status = pthread_attr_setschedparam(&attr, &param);
316     if ( status ) cdo_sys_error("pthread_attr_setschedparam failed for '%s'", newarg+1);
317   */
318   /* status = pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); */
319   /* if ( status ) cdo_sys_error("pthread_attr_setinheritsched failed for '%s'", newarg+1); */
320 
321   int pthreadScope;
322   pthread_attr_getscope(&attr, &pthreadScope);
323 
324   /* status = pthread_attr_setscope(&attr, PTHREAD_SCOPE_PROCESS); */
325   /* if ( status ) cdo_sys_error("pthread_attr_setscope failed for '%s'", newarg+1); */
326   /* If system scheduling scope is specified, then the thread is scheduled against all threads in the system */
327   /* pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); */
328 
329   size_t stacksize = 0;
330   status = pthread_attr_getstacksize(&attr, &stacksize);
331   if (stacksize < 2097152)
332     {
333       stacksize = 2097152;
334       pthread_attr_setstacksize(&attr, stacksize);
335     }
336 
337   pthread_t thrID;
338   const auto rval = pthread_create(&thrID, &attr, m_module.func, this);
339   if (rval != 0)
340     {
341       errno = rval;
342       cdo_sys_error("pthread_create failed for '%s'", operatorName.c_str());
343     }
344 
345   m_isActive = true;
346 
347   return thrID;
348 }
349 
350 void
print_benchmarks(double p_walltime,const char * p_memstring)351 Process::print_benchmarks(double p_walltime, const char *p_memstring)
352 {
353   if (m_ID == 0)
354     {
355       const auto number_of_used_thread = get_process_num();
356       if (ITSME)
357         fprintf(stdout, " [%.2fs%s %dthread%s]", p_walltime, p_memstring, number_of_used_thread, ADD_PLURAL(number_of_used_thread));
358       else
359         fprintf(stdout, " [%.2fs%s]", p_walltime, p_memstring);
360     }
361 }
362 
363 // local helper function
364 extern "C" size_t getPeakRSS();
365 static void
getMaxMemString(char * p_memstring,size_t memstringLen)366 getMaxMemString(char *p_memstring, size_t memstringLen)
367 {
368   auto memmax = getPeakRSS();
369   if (memmax)
370     {
371       size_t muindex = 0;
372       const char *mu[] = { "B", "KB", "MB", "GB", "TB", "PB" };
373       const size_t nmu = sizeof(mu) / sizeof(char *);
374       while (memmax > 9999 && muindex < nmu - 1)
375         {
376           memmax /= 1024;
377           muindex++;
378         }
379       snprintf(p_memstring, memstringLen, " %zu%s", memmax, mu[muindex]);
380     }
381 }
382 
383 void
print_processed_values()384 Process::print_processed_values()
385 {
386   set_text_color(stdout, GREEN);
387   fprintf(stdout, "%s: ", prompt);
388   reset_text_color(stdout);
389 
390   const auto nvals = inq_nvals();
391 
392   if (nvals > 0)
393     {
394       fprintf(stdout, "Processed %zu value%s from %d variable%s", nvals, ADD_PLURAL(nvals), nvars, ADD_PLURAL(nvars));
395     }
396   else if (nvars > 0)
397     {
398       fprintf(stdout, "Processed %d variable%s", nvars, ADD_PLURAL(nvars));
399     }
400 
401   if ((nvals || nvars) && ntimesteps > 0) fprintf(stdout, " over %d timestep%s", ntimesteps, ADD_PLURAL(ntimesteps));
402 
403   char memstring[32] = { "" };
404   if (m_ID == 0) getMaxMemString(memstring, sizeof(memstring));
405   if (!Options::silentMode) print_benchmarks(cdo_get_wtime() - startTime, memstring);
406 
407   if (nvars > 0 || nvals > 0 || ntimesteps > 0 || m_ID == 0) fprintf(stdout, ".");
408   fprintf(stdout, "\n");
409 }
410 
411 bool
has_out_stream(const CdoStreamID p_streamID)412 Process::has_out_stream(const CdoStreamID p_streamID)
413 {
414   for (const CdoStreamID &streamID : outputStreams)
415     {
416       if (streamID == p_streamID) return true;
417     }
418   return false;
419 }
420 
421 bool
has_in_stream(const CdoStreamID p_streamID)422 Process::has_in_stream(const CdoStreamID p_streamID)
423 {
424   for (const CdoStreamID &streamID : inputStreams)
425     {
426       if (streamID == p_streamID) return true;
427     }
428   return false;
429 }
430 
431 size_t
inq_nvals()432 Process::inq_nvals()
433 {
434   size_t nvals = 0;
435   for (size_t i = 0; i < inputStreams.size(); i++)
436     {
437       Debug(PROCESS, "Inquiring nvals from instream %s", inputStreams[i]->m_name);
438       nvals += inputStreams[i]->getNvals();
439     }
440   return nvals;
441 }
442 
443 bool
has_no_pipes()444 Process::has_no_pipes()
445 {
446   return (childProcesses.size() == 0);
447 }
448 
449 const char *
get_out_stream_name()450 Process::get_out_stream_name()
451 {
452   return outputStreams[0]->m_name.c_str();
453 }
454 
455 size_t
get_oper_argc()456 Process::get_oper_argc()
457 {
458   return m_oargv.size();
459 }
460 
461 std::string
get_argv(int p_idx)462 Process::get_argv(int p_idx)
463 {
464   if (!(p_idx > (int) get_oper_argc() && p_idx > 0))
465     cdo_abort("Process Argv not found. Idx: %d, Process argc: %d", p_idx, m_oargv.size());
466 
467   return m_oargv[p_idx];
468 }
469 
470 const std::string
get_obase()471 Process::get_obase()
472 {
473   return m_obase;
474 }
475 
get_id()476 int Process::get_id(){
477   return m_ID;
478 }
479