1 /*
2   This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3 
4   Author: Uwe Schulzweida
5           Oliver Heidmann
6 
7 */
8 
9 #ifdef HAVE_CONFIG_H
10 #include "config.h"
11 #endif
12 
13 #ifdef HAVE_PTHREAD_H
14 #include <pthread.h>
15 #endif
16 
17 #ifdef _OPENMP
18 #include <omp.h>
19 #endif
20 
21 #include <cassert>
22 
23 // Debug and message includes
24 #include <cdi.h>
25 
26 #include "cdo_options.h"
27 #include "cdo_default_values.h"
28 #include "process.h"
29 
30 thread_local Process* localProcess;
31 static int NumProcessActive = 0;
32 
33 int
cdo_filetype(void)34 cdo_filetype(void)
35 {
36   if (CdoDefault::FileType == CDI_UNDEFID)
37     {
38       CdoDefault::FileType = CDI_FILETYPE_GRB;
39       if (Options::cdoVerbose) cdo_print("Set default filetype to GRIB1");
40     }
41 
42   return CdoDefault::FileType;
43 }
44 
45 Process &
process_self(void)46 process_self(void)
47 {
48   return *localProcess;
49 }
50 
51 void
process_def_var_num(int nvars)52 process_def_var_num(int nvars)
53 {
54   localProcess->nvars += nvars;
55 }
56 
57 /* cdoStreamInqTimeStep(...)
58  * sets the given p_pstreamptr to given tsID and returns the number of records this timestep contains
59  */
60 int
cdo_stream_inq_timestep(CdoStreamID p_pstreamptr,int tsID)61 cdo_stream_inq_timestep(CdoStreamID p_pstreamptr, int tsID)
62 {
63   Debug(PROCESS, "%s pstreamID %d", p_pstreamptr->m_name, p_pstreamptr->get_id());
64   int nrecs = p_pstreamptr->inq_timestep(tsID);
65   if (nrecs && tsID == p_pstreamptr->getTsID())
66     {
67       localProcess->ntimesteps++;
68       Debug(PROCESS, "Timestep cnt for localProcess: %s: %d", localProcess->prompt, localProcess->ntimesteps);
69     }
70   return nrecs;
71 }
72 
73 int
cdo_operator_argc(void)74 cdo_operator_argc(void)
75 {
76   return localProcess->get_oper_argc();
77 }
78 
79 const std::string &
cdo_operator_argv(size_t idx)80 cdo_operator_argv(size_t idx)
81 {
82   const auto argc = localProcess->get_oper_argc();
83   assert(((void) "Internal error: argument index out of bounds!", (idx < argc)));
84 
85   return localProcess->m_oargv[idx];
86 }
87 
88 const std::vector<std::string> &
cdo_get_oper_argv()89 cdo_get_oper_argv()
90 {
91   return localProcess->m_oargv;
92 }
93 
94 void
operator_check_argc(int numargs)95 operator_check_argc(int numargs)
96 {
97   const int argc = localProcess->get_oper_argc();
98 
99   if (argc < numargs)
100     cdo_abort("Too few arguments! Need %d found %d.", numargs, argc);
101   else if (argc > numargs)
102     cdo_abort("Too many arguments! Need %d found %d.", numargs, argc);
103 }
104 
105 static void
print_enter(const char * prompt,const char * enter)106 print_enter(const char *prompt, const char *enter)
107 {
108   set_text_color(stderr, BRIGHT, MAGENTA);
109   fprintf(stderr, "%-16s : ", prompt);
110   reset_text_color(stderr);
111   fprintf(stderr, "Enter %s > ", enter);
112 }
113 
114 static std::string
get_input()115 get_input()
116 {
117   std::string line;
118   std::string fline = "";
119 
120   do
121     {
122       std::getline(std::cin, line);
123       // std::cout << line << std::endl;
124       fline += line;
125     }
126   while (std::string(line).find("\\") != std::string::npos);
127 
128   return fline;
129 }
130 
131 void
operator_input_arg(const char * enter)132 operator_input_arg(const char *enter)
133 {
134   while (true)
135     {
136       if (localProcess->get_oper_argc() != 0) return;
137 
138       if (enter) print_enter(localProcess->prompt, enter);
139       std::stringstream stringStream(get_input());
140 
141       std::string line;
142       while (std::getline(stringStream, line))
143         {
144           std::size_t prev = 0, pos;
145           while ((pos = line.find_first_of(", \\", prev)) != std::string::npos)
146             {
147               if (pos > prev) localProcess->m_oargv.push_back(line.substr(prev, pos - prev));
148               prev = pos + 1;
149             }
150           if (prev < line.length()) localProcess->m_oargv.push_back(line.substr(prev, std::string::npos));
151         }
152     }
153 }
154 
155 int
cdo_operator_add(const char * name,int f1,int f2,const char * enter)156 cdo_operator_add(const char *name, int f1, int f2, const char *enter)
157 {
158   return localProcess->operator_add(name, f1, f2, enter);
159 }
160 
161 int
cdo_operator_id(void)162 cdo_operator_id(void)
163 {
164   return localProcess->get_operator_id();
165 }
166 
167 int
cdo_operator_f1(int operID)168 cdo_operator_f1(int operID)
169 {
170   return localProcess->oper[operID].f1;
171 }
172 
173 int
cdo_operator_f2(int operID)174 cdo_operator_f2(int operID)
175 {
176   return localProcess->oper[operID].f2;
177 }
178 
179 const char *
cdo_operator_name(int operID)180 cdo_operator_name(int operID)
181 {
182   return localProcess->oper[operID].name.c_str();
183 }
184 
185 const char *
cdo_operator_enter(int operID)186 cdo_operator_enter(int operID)
187 {
188   return localProcess->oper[operID].enter;
189 }
190 
191 int
cdo_stream_number()192 cdo_stream_number()
193 {
194   return operator_stream_number(localProcess->operatorName);
195 }
196 
197 int
cdo_stream_cnt(void)198 cdo_stream_cnt(void)
199 {
200   return localProcess->m_streamCnt;
201 }
202 
203 CdoStreamID
cdo_open_read(int inStreamIDX)204 cdo_open_read(int inStreamIDX)
205 {
206   Debug(PROCESS, "Getting in stream %d of process %d", inStreamIDX, localProcess->m_ID);
207 
208   if (localProcess->get_stream_cnt_in() < inStreamIDX || inStreamIDX < 0)
209     cdo_abort("instream %d of process %d not found", inStreamIDX, localProcess->m_ID);
210 
211   const auto inStream = localProcess->inputStreams[inStreamIDX];
212   inStream->open_read();
213 
214   return inStream;
215 }
216 
217 int
cdoStreamOpenRead(int inStreamIDX)218 cdoStreamOpenRead(int inStreamIDX)
219 {
220   return cdo_open_read(inStreamIDX)->get_id();  // return ID
221 }
222 
223 /*parameters:
224  *  p_outStreamIDX: In operator defined out stream ID
225  *  filetype      : Currently not used in operators! Default value is CDI_UNDEFID.
226  */
227 CdoStreamID
cdo_open_write(int p_outStreamIDX,int filetype)228 cdo_open_write(int p_outStreamIDX, int filetype)
229 {
230   if (filetype == CDI_UNDEFID) filetype = cdo_filetype();
231   Debug(PROCESS, "Getting out stream %d of process %d", p_outStreamIDX, localProcess->m_ID);
232 
233   const int outStreamIDX = p_outStreamIDX - localProcess->inputStreams.size();
234   if (outStreamIDX > localProcess->get_stream_cnt_out() || outStreamIDX < 0)
235     {
236       cdo_abort("outstream %d of %d not found. Called with streamIdx = %d", outStreamIDX, localProcess->m_ID, p_outStreamIDX);
237     }
238 
239   const auto outStream = localProcess->outputStreams[outStreamIDX];
240   outStream->open_write(filetype);
241 
242   return outStream;
243 }
244 
245 CdoStreamID
cdo_open_write(const std::string & p_filename,int filetype)246 cdo_open_write(const std::string &p_filename, int filetype)
247 {
248   if (filetype == CDI_UNDEFID) filetype = cdo_filetype();
249 
250   localProcess->add_file_out_stream(p_filename);
251 
252   const auto pstreamID = localProcess->outputStreams.back()->open_write(filetype);
253   if (pstreamID == -1) cdo_abort("Could not create pstream for file: %s", p_filename);
254 
255   return localProcess->outputStreams.back();
256 }
257 
258 CdoStreamID
cdo_open_append(int p_outFileIndex)259 cdo_open_append(int p_outFileIndex)
260 {
261   const auto streamIndex = p_outFileIndex - localProcess->inputStreams.size();
262   const auto outStream = localProcess->outputStreams[streamIndex];
263 
264   const auto fileID = outStream->open_append();
265   if (fileID < 0) cdi_open_error(fileID, "Open failed on >%s<", outStream->m_name.c_str());
266 
267   return outStream;
268 }
269 
270 int
cdoStreamOpenAppend(int p_outFileIndex)271 cdoStreamOpenAppend(int p_outFileIndex)
272 {
273   return cdo_open_append(p_outFileIndex)->get_id();
274 }
275 
276 /**
277  * Returns the output stream name as std::string for \p outStreamID.
278  */
279 static const char *
cdoGetOutStreamName()280 cdoGetOutStreamName()
281 {
282   return localProcess->get_out_stream_name();
283 }
284 
285 /**
286  * Returns the input stream name as std::string for inStreamID.
287  */
288 
289 const std::shared_ptr<Process>
cdoGetInputChild(int p_inID)290 cdoGetInputChild(int p_inID)
291 {
292   for (const std::shared_ptr<Process>&processPtr : localProcess->childProcesses)
293     {
294       if (processPtr->has_out_stream(localProcess->inputStreams[p_inID])) return processPtr;
295     }
296   return nullptr;
297 }
298 
299 static const char *
cdoGetInStreamName(int p_inStream)300 cdoGetInStreamName(int p_inStream)
301 {
302   return localProcess->inputStreams[p_inStream]->m_name.c_str();
303 }
304 
305 const char *
cdo_get_stream_name(int p_streamIndex)306 cdo_get_stream_name(int p_streamIndex)
307 {
308   const char *streamName = nullptr;
309   Debug(PROCESS, "stridx %d", p_streamIndex);
310   if (p_streamIndex >= static_cast<int>(localProcess->inputStreams.size()))
311     {
312       Debug(PROCESS, "Getting output stream name %d", p_streamIndex);
313       streamName = cdoGetOutStreamName();
314     }
315   else
316     {
317       Debug(PROCESS, "Getting input stream name %d", p_streamIndex);
318       streamName = cdoGetInStreamName(p_streamIndex);
319     }
320 
321   Debug(PROCESS, "StreamName is: %s", streamName);
322 
323   return streamName;
324 }
325 
326 std::string
cdo_get_command_from_in_stream(int p_streamIndex)327 cdo_get_command_from_in_stream(int p_streamIndex)
328 {
329   const std::shared_ptr<Process> process = cdoGetInputChild(p_streamIndex);
330   if (!process)  // then this should be a file and cdo_get_stream_name can handle this
331     {
332       return cdo_get_stream_name(p_streamIndex);
333     }
334   return process->operatorName;
335 }
336 
337 bool
cdo_assert_files_only()338 cdo_assert_files_only()
339 {
340   return localProcess->has_no_pipes();
341 }
342 
343 std::string
cdo_get_obase()344 cdo_get_obase()
345 {
346   return localProcess->get_obase();
347 }
348 
349 void
cdo_initialize(void * p_process)350 cdo_initialize(void *p_process)
351 {
352 #if defined(_OPENMP)
353   omp_set_num_threads(Threading::ompNumThreads);  // Has to be called for every module (pthread)!
354 #endif
355   localProcess = (Process *) p_process;
356 #ifdef HAVE_LIBPTHREAD
357   localProcess->threadID = pthread_self();
358 #endif
359   Debug(PROCESS, "Initializing process: %s (id: %d)", localProcess->operatorName , localProcess->get_id());
360 
361 #ifdef HAVE_LIBPTHREAD
362   Debug(PROCESS, "process %d thread %ld", localProcess->m_ID, pthread_self());
363 #endif
364 }
365 
366 const char *
process_inq_prompt(void)367 process_inq_prompt(void)
368 {
369   return localProcess ? localProcess->inq_promt() : cdo::progname;
370 }
371 
372 extern "C" size_t getPeakRSS();
373 
374 void
cdo_finish(void)375 cdo_finish(void)
376 {
377   Debug(PROCESS, "Finishing process: %d", localProcess->m_ID);
378 
379 #ifdef HAVE_LIBPTHREAD
380   Debug(PROCESS, "process %d thread %ld", localProcess->m_ID, pthread_self());
381 #endif
382   if (!Options::silentMode) localProcess->print_processed_values();
383 
384   localProcess->set_inactive();
385   NumProcessActive--;
386 }
387 
388 /* TODO move cdoClose internals into Pstream::close/ CdoStream::close */
389 void
cdo_stream_close(CdoStreamID pstreamPtr)390 cdo_stream_close(CdoStreamID pstreamPtr)
391 {
392   Debug(PROCESS, "Adding %d to pstream %d %s", localProcess->inq_nvals(), pstreamPtr->get_id(), pstreamPtr->m_name);
393   pstreamPtr->close();
394 }
395 
396 /******************************************************************/
397 /* Functions that only work on streamID's and do not need process */
398 /******************************************************************/
399 
400 int
cdo_stream_inq_vlist(CdoStreamID p_pstreamPtr)401 cdo_stream_inq_vlist(CdoStreamID p_pstreamPtr)
402 {
403   if (p_pstreamPtr == nullptr) return -1;
404 
405   Debug(PROCESS, "Inquiring Vlist from pstream %d", p_pstreamPtr->get_id());
406   const auto vlistID = p_pstreamPtr->inq_vlist();
407   if (vlistNumber(vlistID) == CDI_COMP && cdo_stream_number() == CDI_REAL)
408     cdo_abort("Fields with complex numbers are not supported by this operator!");
409 
410   if (vlistNumber(vlistID) == CDI_REAL && cdo_stream_number() == CDI_COMP)
411     cdo_abort("This operator needs fields with complex numbers!");
412 
413   process_def_var_num(vlistNvars(vlistID));
414 
415   return vlistID;
416 }
417 
418 // - - - - - - -
419 void
cdo_write_record_f(CdoStreamID p_pstreamPtr,float * data,size_t nmiss)420 cdo_write_record_f(CdoStreamID p_pstreamPtr, float *data, size_t nmiss)
421 {
422   p_pstreamPtr->write_record(data, nmiss);
423 }
424 
425 void
cdo_write_record(CdoStreamID p_pstreamPtr,double * data,size_t nmiss)426 cdo_write_record(CdoStreamID p_pstreamPtr, double *data, size_t nmiss)
427 {
428   p_pstreamPtr->write_record(data, nmiss);
429 }
430 
431 void
cdo_write_record(CdoStreamID p_pstreamPtr,Field & field)432 cdo_write_record(CdoStreamID p_pstreamPtr, Field &field)
433 {
434   if (field.memType == MemType::Float)
435     cdo_write_record_f(p_pstreamPtr, field.vec_f.data(), field.nmiss);
436   else
437     cdo_write_record(p_pstreamPtr, field.vec_d.data(), field.nmiss);
438 }
439 
440 void
cdo_write_record(CdoStreamID p_pstreamPtr,Field3D & field,int levelID,size_t nmiss)441 cdo_write_record(CdoStreamID p_pstreamPtr, Field3D &field, int levelID, size_t nmiss)
442 {
443   const auto offset = levelID * field.gridsize * field.nwpv;
444   if (field.memType == MemType::Float)
445     cdo_write_record_f(p_pstreamPtr, field.vec_f.data() + offset, nmiss);
446   else
447     cdo_write_record(p_pstreamPtr, field.vec_d.data() + offset, nmiss);
448 }
449 // - - - - - - -
450 void
cdo_def_vlist(CdoStreamID p_pstreamPtr,int vlistID)451 cdo_def_vlist(CdoStreamID p_pstreamPtr, int vlistID)
452 {
453   p_pstreamPtr->def_vlist(vlistID);
454 }
455 // - - - - - - -
456 void
cdo_def_timestep(CdoStreamID p_pstreamPtr,int tsID)457 cdo_def_timestep(CdoStreamID p_pstreamPtr, int tsID)
458 {
459   p_pstreamPtr->def_timestep(tsID);
460 }
461 // - - - - - - -
462 //
463 int
cdo_inq_filetype(CdoStreamID p_pstreamPtr)464 cdo_inq_filetype(CdoStreamID p_pstreamPtr)
465 {
466   return p_pstreamPtr->inqFileType();
467 }
468 // - - - - - - -
469 void
cdo_inq_grib_info(CdoStreamID p_streamPtr,int * intnum,float * fltnum,off_t * bignum)470 cdo_inq_grib_info(CdoStreamID p_streamPtr, int *intnum, float *fltnum, off_t *bignum)
471 {
472   streamInqGRIBinfo(p_streamPtr->m_fileID, intnum, fltnum, bignum);
473 }
474 
475 // - - - - - - -
476 int
cdo_inq_byteorder(CdoStreamID p_pstreamPtr)477 cdo_inq_byteorder(CdoStreamID p_pstreamPtr)
478 {
479   return p_pstreamPtr->inqByteorder();
480 }
481 
482 // - - - - - - -
483 void
cdo_read_record_f(CdoStreamID p_pstreamPtr,float * data,size_t * nmiss)484 cdo_read_record_f(CdoStreamID p_pstreamPtr, float *data, size_t *nmiss)
485 {
486   if (data == nullptr) cdo_abort("Data pointer not allocated (cdo_read_record)!");
487   p_pstreamPtr->read_record(data, nmiss);
488 }
489 
490 void
cdo_read_record(CdoStreamID p_pstreamPtr,double * data,size_t * nmiss)491 cdo_read_record(CdoStreamID p_pstreamPtr, double *data, size_t *nmiss)
492 {
493   if (data == nullptr) cdo_abort("Data pointer not allocated (cdo_read_record)!");
494   p_pstreamPtr->read_record(data, nmiss);
495 }
496 
497 void
cdo_read_record(CdoStreamID streamID,Field & field)498 cdo_read_record(CdoStreamID streamID, Field &field)
499 {
500   if (field.memType == MemType::Float)
501     cdo_read_record_f(streamID, field.vec_f.data(), &field.nmiss);
502   else
503     cdo_read_record(streamID, field.vec_d.data(), &field.nmiss);
504 }
505 
506 void
cdo_read_record(CdoStreamID streamID,Field3D & field,int levelID,size_t * nmiss)507 cdo_read_record(CdoStreamID streamID, Field3D &field, int levelID, size_t *nmiss)
508 {
509   const auto offset = levelID * field.gridsize * field.nwpv;
510   if (field.memType == MemType::Float)
511     cdo_read_record_f(streamID, field.vec_f.data() + offset, nmiss);
512   else
513     cdo_read_record(streamID, field.vec_d.data() + offset, nmiss);
514 }
515 // - - - - - - -
516 
517 void
cdo_copy_record(CdoStreamID pstreamPtrDest,CdoStreamID pstreamPtrSrc)518 cdo_copy_record(CdoStreamID pstreamPtrDest, CdoStreamID pstreamPtrSrc)
519 {
520   Debug(PROCESS, "pstreamIDdest = %d pstreamIDsrc = %d", pstreamPtrDest->get_id(), pstreamPtrSrc->get_id());
521   pstreamPtrDest->copyRecord(pstreamPtrSrc);
522 }
523 
524 // - - - - - - -
525 
526 void
cdo_inq_record(CdoStreamID pstreamptr,int * varID,int * levelID)527 cdo_inq_record(CdoStreamID pstreamptr, int *varID, int *levelID)
528 {
529   pstreamptr->inq_record(varID, levelID);
530 }
531 
532 void
cdo_def_record(CdoStreamID pstreamptr,int varID,int levelID)533 cdo_def_record(CdoStreamID pstreamptr, int varID, int levelID)
534 {
535   pstreamptr->defRecord(varID, levelID);
536 }
537 // - - - - - - -
538 
539 void
cdo_def_comp_type(CdoStreamID p_streamID,int p_cdi_compression_type)540 cdo_def_comp_type(CdoStreamID p_streamID, int p_cdi_compression_type)
541 {
542   streamDefCompType(p_streamID->m_fileID, p_cdi_compression_type);
543 }
544 
545 bool
data_is_unchanged()546 data_is_unchanged()
547 {
548   const auto unchanged
549       = (localProcess->m_ID == 0 && localProcess->has_no_pipes() && Options::cdoRegulargrid == false && CdoDefault::FileType == -1
550          && CdoDefault::DataType == -1 && CdoDefault::Byteorder == -1 && Options::CDO_Memtype == MemType::Native);
551   return unchanged;
552 }
553 
554 void
cdo_set_nan(double missval,size_t gridsize,double * array)555 cdo_set_nan(double missval, size_t gridsize, double *array)
556 {
557   if (std::isnan(missval))
558     {
559       constexpr double newmissval = -9.e33;
560       for (size_t i = 0; i < gridsize; ++i)
561         if (DBL_IS_EQUAL(array[i], missval)) array[i] = newmissval;
562     }
563 }
564