1 /*
2 This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3
4 Author: Uwe Schulzweida
5 Oliver Heidmann
6
7 */
8
9 #ifdef HAVE_CONFIG_H
10 #include "config.h"
11 #endif
12
13 #ifdef HAVE_PTHREAD_H
14 #include <pthread.h>
15 #endif
16
17 #ifdef _OPENMP
18 #include <omp.h>
19 #endif
20
21 #include <stdio.h>
22 #include <iostream>
23 #include <string>
24 #include <sstream>
25
26 #include "process.h"
27 #include "cdo_wtime.h"
28 #include "util_string.h"
29 #include "cdo_options.h"
30 #include "fileStream.h"
31 #include "pipeStream.h"
32
33 static int processNum = 0;
34
35 int
get_process_num()36 get_process_num()
37 {
38 return processNum;
39 }
40
41 void
set_process_num(int p_num)42 set_process_num(int p_num)
43 {
44 processNum = p_num;
45 }
46
Process(int p_ID,const std::string & p_operatorName,const std::vector<std::string> & p_arguments)47 Process::Process(int p_ID, const std::string &p_operatorName, const std::vector<std::string> &p_arguments)
48 : m_ID(p_ID), operatorName(p_operatorName)
49 {
50 m_isActive = true;
51 init_process(p_operatorName, p_arguments);
52 }
53
54 bool
input_is_variable()55 Process::input_is_variable()
56 {
57 return m_module.streamInCnt == -1;
58 }
59
60 void
init_process(const std::string & p_operatorName,const std::vector<std::string> & p_arguments)61 Process::init_process(const std::string &p_operatorName, const std::vector<std::string> &p_arguments)
62 {
63 startTime = cdo_get_wtime();
64 #ifdef HAVE_LIBPTHREAD
65 threadID = pthread_self();
66 #endif
67 m_oargv = p_arguments;
68 operatorName = get_original(p_operatorName);
69
70 m_module = get_module(p_operatorName);
71
72 def_prompt(); // has to be called after get operatorName
73 }
74
75 int
get_stream_cnt_in()76 Process::get_stream_cnt_in()
77 {
78 return inputStreams.size();
79 }
80
81 int
get_stream_cnt_out()82 Process::get_stream_cnt_out()
83 {
84 return outputStreams.size();
85 }
86
87 void
def_prompt()88 Process::def_prompt()
89 {
90 if (m_ID == 0)
91 snprintf(prompt, sizeof(prompt), "%s %s", cdo::progname, operatorName.c_str());
92 else
93 snprintf(prompt, sizeof(prompt), "%s(%d) %s", cdo::progname, m_ID, operatorName.c_str());
94 }
95
96 const char *
inq_promt() const97 Process::inq_promt() const
98 {
99 return prompt;
100 }
101
102 void
handle_process_err()103 Process::handle_process_err()
104 {
105 switch (m_status)
106 {
107 case ProcessStatus::UnlimitedIOCounts:
108 {
109 cdo_abort("I/O stream counts unlimited no allowed!");
110 break;
111 }
112 case ProcessStatus::MissInput:
113 {
114 cdo_abort("Input streams missing!");
115 break;
116 }
117 case ProcessStatus::MissOutput:
118 {
119 cdo_abort("Output streams missing!");
120 break;
121 }
122 case ProcessStatus::TooManyStreams:
123 case ProcessStatus::TooFewStreams:
124 {
125 const auto inCnt = m_module.streamInCnt;
126 auto outCnt = m_module.streamOutCnt;
127 const bool lobase = outCnt == -1;
128 if (lobase) outCnt = 1;
129
130 std::string caseCount = (m_status == ProcessStatus::TooManyStreams) ? "many" : "few";
131 std::string pluralIn = (inCnt > 1) ? "s" : "";
132 std::string pluralOut = (outCnt > 1) ? "s" : "";
133
134 std::stringstream errMsg;
135 errMsg << "Too " << caseCount << " streams specified! Operator " << operatorName << " needs " << inCnt << " input stream"
136 << pluralIn << " and " << outCnt << " output " << (lobase ? "basename" : "stream") << pluralOut << "!";
137 cdo_abort(errMsg.str());
138 break;
139 }
140 case ProcessStatus::Ok: break;
141 }
142 }
143
144 void
validate()145 Process::validate()
146 {
147 check_stream_cnt();
148 if (m_status != ProcessStatus::Ok) handle_process_err();
149 }
150
151 void
check_stream_cnt()152 Process::check_stream_cnt()
153 {
154 int streamCnt = 0;
155
156 int wantedStreamInCnt = m_module.streamInCnt;
157 int wantedStreamOutCnt = m_module.streamOutCnt;
158
159 int streamInCnt0 = wantedStreamInCnt;
160
161 bool obase = false;
162 if (wantedStreamOutCnt == -1)
163 {
164 wantedStreamOutCnt = 1;
165 obase = true;
166 }
167
168 if (wantedStreamInCnt == -1 && wantedStreamOutCnt == -1)
169 {
170 m_status = ProcessStatus::UnlimitedIOCounts;
171 }
172
173 // printf(" wantedStreamInCnt,wantedStreamOutCnt %d %d\n",
174 // wantedStreamInCnt,wantedStreamOutCnt);
175 else if (wantedStreamInCnt == -1)
176 {
177 wantedStreamInCnt = m_streamCnt - wantedStreamOutCnt;
178 if (wantedStreamInCnt < 1) m_status = ProcessStatus::MissInput;
179 }
180
181 else if (wantedStreamOutCnt == -1)
182 {
183 wantedStreamOutCnt = m_streamCnt - wantedStreamInCnt;
184 if (wantedStreamOutCnt < 1) m_status = ProcessStatus::MissOutput;
185 }
186 else
187 {
188 // printf(" wantedStreamInCnt,wantedStreamOutCnt %d %d\n",
189 // wantedStreamInCnt,wantedStreamOutCnt);
190
191 streamCnt = wantedStreamInCnt + wantedStreamOutCnt;
192 // printf(" streamCnt %d %d\n", m_streamCnt, streamCnt);
193
194 if (m_streamCnt > streamCnt)
195 m_status = ProcessStatus::TooManyStreams;
196 else if (m_streamCnt < streamCnt && !obase)
197 m_status = ProcessStatus::TooFewStreams;
198 else if (wantedStreamInCnt > (int) inputStreams.size())
199 m_status = ProcessStatus::TooFewStreams;
200
201 else if (wantedStreamInCnt == 1 && streamInCnt0 == -1)
202 m_status = ProcessStatus::Ok;
203 }
204 }
205
206 bool
has_hall_inputs()207 Process::has_hall_inputs()
208 {
209 if (m_module.streamInCnt == -1) return false;
210
211 return (m_module.streamInCnt == static_cast<short>(inputStreams.size()));
212 }
213
214 void
set_inactive()215 Process::set_inactive()
216 {
217 m_isActive = false;
218 }
219
220 int
operator_add(const char * name,int f1,int f2,const char * enter)221 Process::operator_add(const char *name, int f1, int f2, const char *enter)
222 {
223 const int operID = m_noper;
224
225 if (operID >= MAX_OPERATOR) cdo_abort("Maximum number of %d operators reached!", MAX_OPERATOR);
226
227 oper[m_noper] = { f1, f2, name, enter };
228
229 m_noper++;
230
231 return operID;
232 }
233
234 int
get_operator_id()235 Process::get_operator_id()
236 {
237 if (m_noper > 0)
238 {
239 for (int operID = 0; operID < m_noper; operID++)
240 {
241 if (operatorName == oper[operID].name) return operID;
242 }
243
244 cdo_abort("Operator not callable by this name! Name is: %s", operatorName);
245 }
246
247 cdo_abort("Operator not initialized!");
248
249 return -1;
250 }
251
252 void
add_file_in_stream(const std::string & file)253 Process::add_file_in_stream(const std::string &file)
254 {
255 inputStreams.push_back(std::make_shared<FileStream>(file));
256 m_streamCnt++;
257 }
258
259 void
add_file_out_stream(const std::string & file)260 Process::add_file_out_stream(const std::string &file)
261 {
262 if (file[0] == '-')
263 {
264 cdo_abort("Missing output file. Found an operator instead of filename: %s", file);
265 }
266 outputStreams.push_back(std::make_shared<FileStream>(file));
267 m_streamCnt++;
268 }
269
270 void
add_child(const std::shared_ptr<Process> & childProcess)271 Process::add_child(const std::shared_ptr<Process> &childProcess)
272 {
273 childProcesses.push_back(childProcess);
274 nchild = childProcesses.size();
275 add_pipe_in_stream();
276 }
277
278 void
add_pipe_in_stream()279 Process::add_pipe_in_stream()
280 {
281 #ifdef HAVE_LIBPTHREAD
282 inputStreams.push_back(std::make_shared<PipeStream>(m_ID));
283 m_streamCnt++;
284 #else
285 cdo_abort("Cannot use pipes, pthread support not compiled in!");
286 #endif
287 }
288
289 void
add_parent(const std::shared_ptr<Process> & parentProcess)290 Process::add_parent(const std::shared_ptr<Process> &parentProcess)
291 {
292 parentProcesses.push_back(parentProcess);
293 m_posInParent = parentProcess->inputStreams.size() - 1;
294 add_pipe_out_stream();
295 }
296
297 void
add_pipe_out_stream()298 Process::add_pipe_out_stream()
299 {
300 outputStreams.push_back(parentProcesses[0]->inputStreams[m_posInParent]);
301 m_streamCnt++;
302 }
303
304 pthread_t
run()305 Process::run()
306 {
307 Debug(PROCESS, "starting new thread for process %d", m_ID);
308 pthread_attr_t attr;
309 auto status = pthread_attr_init(&attr);
310 if (status) cdo_sys_error("pthread_attr_init failed for '%s'", operatorName.c_str());
311 status = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
312 if (status) cdo_sys_error("pthread_attr_setdetachstate failed for '%s'", operatorName.c_str());
313 /*
314 param.sched_priority = 0;
315 status = pthread_attr_setschedparam(&attr, ¶m);
316 if ( status ) cdo_sys_error("pthread_attr_setschedparam failed for '%s'", newarg+1);
317 */
318 /* status = pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); */
319 /* if ( status ) cdo_sys_error("pthread_attr_setinheritsched failed for '%s'", newarg+1); */
320
321 int pthreadScope;
322 pthread_attr_getscope(&attr, &pthreadScope);
323
324 /* status = pthread_attr_setscope(&attr, PTHREAD_SCOPE_PROCESS); */
325 /* if ( status ) cdo_sys_error("pthread_attr_setscope failed for '%s'", newarg+1); */
326 /* If system scheduling scope is specified, then the thread is scheduled against all threads in the system */
327 /* pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); */
328
329 size_t stacksize = 0;
330 status = pthread_attr_getstacksize(&attr, &stacksize);
331 if (stacksize < 2097152)
332 {
333 stacksize = 2097152;
334 pthread_attr_setstacksize(&attr, stacksize);
335 }
336
337 pthread_t thrID;
338 const auto rval = pthread_create(&thrID, &attr, m_module.func, this);
339 if (rval != 0)
340 {
341 errno = rval;
342 cdo_sys_error("pthread_create failed for '%s'", operatorName.c_str());
343 }
344
345 m_isActive = true;
346
347 return thrID;
348 }
349
350 void
print_benchmarks(double p_walltime,const char * p_memstring)351 Process::print_benchmarks(double p_walltime, const char *p_memstring)
352 {
353 if (m_ID == 0)
354 {
355 const auto number_of_used_thread = get_process_num();
356 if (ITSME)
357 fprintf(stdout, " [%.2fs%s %dthread%s]", p_walltime, p_memstring, number_of_used_thread, ADD_PLURAL(number_of_used_thread));
358 else
359 fprintf(stdout, " [%.2fs%s]", p_walltime, p_memstring);
360 }
361 }
362
363 // local helper function
364 extern "C" size_t getPeakRSS();
365 static void
getMaxMemString(char * p_memstring,size_t memstringLen)366 getMaxMemString(char *p_memstring, size_t memstringLen)
367 {
368 auto memmax = getPeakRSS();
369 if (memmax)
370 {
371 size_t muindex = 0;
372 const char *mu[] = { "B", "KB", "MB", "GB", "TB", "PB" };
373 const size_t nmu = sizeof(mu) / sizeof(char *);
374 while (memmax > 9999 && muindex < nmu - 1)
375 {
376 memmax /= 1024;
377 muindex++;
378 }
379 snprintf(p_memstring, memstringLen, " %zu%s", memmax, mu[muindex]);
380 }
381 }
382
383 void
print_processed_values()384 Process::print_processed_values()
385 {
386 set_text_color(stdout, GREEN);
387 fprintf(stdout, "%s: ", prompt);
388 reset_text_color(stdout);
389
390 const auto nvals = inq_nvals();
391
392 if (nvals > 0)
393 {
394 fprintf(stdout, "Processed %zu value%s from %d variable%s", nvals, ADD_PLURAL(nvals), nvars, ADD_PLURAL(nvars));
395 }
396 else if (nvars > 0)
397 {
398 fprintf(stdout, "Processed %d variable%s", nvars, ADD_PLURAL(nvars));
399 }
400
401 if ((nvals || nvars) && ntimesteps > 0) fprintf(stdout, " over %d timestep%s", ntimesteps, ADD_PLURAL(ntimesteps));
402
403 char memstring[32] = { "" };
404 if (m_ID == 0) getMaxMemString(memstring, sizeof(memstring));
405 if (!Options::silentMode) print_benchmarks(cdo_get_wtime() - startTime, memstring);
406
407 if (nvars > 0 || nvals > 0 || ntimesteps > 0 || m_ID == 0) fprintf(stdout, ".");
408 fprintf(stdout, "\n");
409 }
410
411 bool
has_out_stream(const CdoStreamID p_streamID)412 Process::has_out_stream(const CdoStreamID p_streamID)
413 {
414 for (const CdoStreamID &streamID : outputStreams)
415 {
416 if (streamID == p_streamID) return true;
417 }
418 return false;
419 }
420
421 bool
has_in_stream(const CdoStreamID p_streamID)422 Process::has_in_stream(const CdoStreamID p_streamID)
423 {
424 for (const CdoStreamID &streamID : inputStreams)
425 {
426 if (streamID == p_streamID) return true;
427 }
428 return false;
429 }
430
431 size_t
inq_nvals()432 Process::inq_nvals()
433 {
434 size_t nvals = 0;
435 for (size_t i = 0; i < inputStreams.size(); i++)
436 {
437 Debug(PROCESS, "Inquiring nvals from instream %s", inputStreams[i]->m_name);
438 nvals += inputStreams[i]->getNvals();
439 }
440 return nvals;
441 }
442
443 bool
has_no_pipes()444 Process::has_no_pipes()
445 {
446 return (childProcesses.size() == 0);
447 }
448
449 const char *
get_out_stream_name()450 Process::get_out_stream_name()
451 {
452 return outputStreams[0]->m_name.c_str();
453 }
454
455 size_t
get_oper_argc()456 Process::get_oper_argc()
457 {
458 return m_oargv.size();
459 }
460
461 std::string
get_argv(int p_idx)462 Process::get_argv(int p_idx)
463 {
464 if (!(p_idx > (int) get_oper_argc() && p_idx > 0))
465 cdo_abort("Process Argv not found. Idx: %d, Process argc: %d", p_idx, m_oargv.size());
466
467 return m_oargv[p_idx];
468 }
469
470 const std::string
get_obase()471 Process::get_obase()
472 {
473 return m_obase;
474 }
475
get_id()476 int Process::get_id(){
477 return m_ID;
478 }
479