1 /*
2 This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3
4 Author: Uwe Schulzweida
5 Oliver Heidmann
6
7 */
8
9 #ifdef HAVE_CONFIG_H
10 #include "config.h"
11 #endif
12
13 #ifdef HAVE_PTHREAD_H
14 #include <pthread.h>
15 #endif
16
17 #ifdef _OPENMP
18 #include <omp.h>
19 #endif
20
21 #include <cassert>
22
23 // Debug and message includes
24 #include <cdi.h>
25
26 #include "cdo_options.h"
27 #include "cdo_default_values.h"
28 #include "process.h"
29
30 thread_local Process* localProcess;
31 static int NumProcessActive = 0;
32
33 int
cdo_filetype(void)34 cdo_filetype(void)
35 {
36 if (CdoDefault::FileType == CDI_UNDEFID)
37 {
38 CdoDefault::FileType = CDI_FILETYPE_GRB;
39 if (Options::cdoVerbose) cdo_print("Set default filetype to GRIB1");
40 }
41
42 return CdoDefault::FileType;
43 }
44
45 Process &
process_self(void)46 process_self(void)
47 {
48 return *localProcess;
49 }
50
51 void
process_def_var_num(int nvars)52 process_def_var_num(int nvars)
53 {
54 localProcess->nvars += nvars;
55 }
56
57 /* cdoStreamInqTimeStep(...)
58 * sets the given p_pstreamptr to given tsID and returns the number of records this timestep contains
59 */
60 int
cdo_stream_inq_timestep(CdoStreamID p_pstreamptr,int tsID)61 cdo_stream_inq_timestep(CdoStreamID p_pstreamptr, int tsID)
62 {
63 Debug(PROCESS, "%s pstreamID %d", p_pstreamptr->m_name, p_pstreamptr->get_id());
64 int nrecs = p_pstreamptr->inq_timestep(tsID);
65 if (nrecs && tsID == p_pstreamptr->getTsID())
66 {
67 localProcess->ntimesteps++;
68 Debug(PROCESS, "Timestep cnt for localProcess: %s: %d", localProcess->prompt, localProcess->ntimesteps);
69 }
70 return nrecs;
71 }
72
73 int
cdo_operator_argc(void)74 cdo_operator_argc(void)
75 {
76 return localProcess->get_oper_argc();
77 }
78
79 const std::string &
cdo_operator_argv(size_t idx)80 cdo_operator_argv(size_t idx)
81 {
82 const auto argc = localProcess->get_oper_argc();
83 assert(((void) "Internal error: argument index out of bounds!", (idx < argc)));
84
85 return localProcess->m_oargv[idx];
86 }
87
88 const std::vector<std::string> &
cdo_get_oper_argv()89 cdo_get_oper_argv()
90 {
91 return localProcess->m_oargv;
92 }
93
94 void
operator_check_argc(int numargs)95 operator_check_argc(int numargs)
96 {
97 const int argc = localProcess->get_oper_argc();
98
99 if (argc < numargs)
100 cdo_abort("Too few arguments! Need %d found %d.", numargs, argc);
101 else if (argc > numargs)
102 cdo_abort("Too many arguments! Need %d found %d.", numargs, argc);
103 }
104
105 static void
print_enter(const char * prompt,const char * enter)106 print_enter(const char *prompt, const char *enter)
107 {
108 set_text_color(stderr, BRIGHT, MAGENTA);
109 fprintf(stderr, "%-16s : ", prompt);
110 reset_text_color(stderr);
111 fprintf(stderr, "Enter %s > ", enter);
112 }
113
114 static std::string
get_input()115 get_input()
116 {
117 std::string line;
118 std::string fline = "";
119
120 do
121 {
122 std::getline(std::cin, line);
123 // std::cout << line << std::endl;
124 fline += line;
125 }
126 while (std::string(line).find("\\") != std::string::npos);
127
128 return fline;
129 }
130
131 void
operator_input_arg(const char * enter)132 operator_input_arg(const char *enter)
133 {
134 while (true)
135 {
136 if (localProcess->get_oper_argc() != 0) return;
137
138 if (enter) print_enter(localProcess->prompt, enter);
139 std::stringstream stringStream(get_input());
140
141 std::string line;
142 while (std::getline(stringStream, line))
143 {
144 std::size_t prev = 0, pos;
145 while ((pos = line.find_first_of(", \\", prev)) != std::string::npos)
146 {
147 if (pos > prev) localProcess->m_oargv.push_back(line.substr(prev, pos - prev));
148 prev = pos + 1;
149 }
150 if (prev < line.length()) localProcess->m_oargv.push_back(line.substr(prev, std::string::npos));
151 }
152 }
153 }
154
155 int
cdo_operator_add(const char * name,int f1,int f2,const char * enter)156 cdo_operator_add(const char *name, int f1, int f2, const char *enter)
157 {
158 return localProcess->operator_add(name, f1, f2, enter);
159 }
160
161 int
cdo_operator_id(void)162 cdo_operator_id(void)
163 {
164 return localProcess->get_operator_id();
165 }
166
167 int
cdo_operator_f1(int operID)168 cdo_operator_f1(int operID)
169 {
170 return localProcess->oper[operID].f1;
171 }
172
173 int
cdo_operator_f2(int operID)174 cdo_operator_f2(int operID)
175 {
176 return localProcess->oper[operID].f2;
177 }
178
179 const char *
cdo_operator_name(int operID)180 cdo_operator_name(int operID)
181 {
182 return localProcess->oper[operID].name.c_str();
183 }
184
185 const char *
cdo_operator_enter(int operID)186 cdo_operator_enter(int operID)
187 {
188 return localProcess->oper[operID].enter;
189 }
190
191 int
cdo_stream_number()192 cdo_stream_number()
193 {
194 return operator_stream_number(localProcess->operatorName);
195 }
196
197 int
cdo_stream_cnt(void)198 cdo_stream_cnt(void)
199 {
200 return localProcess->m_streamCnt;
201 }
202
203 CdoStreamID
cdo_open_read(int inStreamIDX)204 cdo_open_read(int inStreamIDX)
205 {
206 Debug(PROCESS, "Getting in stream %d of process %d", inStreamIDX, localProcess->m_ID);
207
208 if (localProcess->get_stream_cnt_in() < inStreamIDX || inStreamIDX < 0)
209 cdo_abort("instream %d of process %d not found", inStreamIDX, localProcess->m_ID);
210
211 const auto inStream = localProcess->inputStreams[inStreamIDX];
212 inStream->open_read();
213
214 return inStream;
215 }
216
217 int
cdoStreamOpenRead(int inStreamIDX)218 cdoStreamOpenRead(int inStreamIDX)
219 {
220 return cdo_open_read(inStreamIDX)->get_id(); // return ID
221 }
222
223 /*parameters:
224 * p_outStreamIDX: In operator defined out stream ID
225 * filetype : Currently not used in operators! Default value is CDI_UNDEFID.
226 */
227 CdoStreamID
cdo_open_write(int p_outStreamIDX,int filetype)228 cdo_open_write(int p_outStreamIDX, int filetype)
229 {
230 if (filetype == CDI_UNDEFID) filetype = cdo_filetype();
231 Debug(PROCESS, "Getting out stream %d of process %d", p_outStreamIDX, localProcess->m_ID);
232
233 const int outStreamIDX = p_outStreamIDX - localProcess->inputStreams.size();
234 if (outStreamIDX > localProcess->get_stream_cnt_out() || outStreamIDX < 0)
235 {
236 cdo_abort("outstream %d of %d not found. Called with streamIdx = %d", outStreamIDX, localProcess->m_ID, p_outStreamIDX);
237 }
238
239 const auto outStream = localProcess->outputStreams[outStreamIDX];
240 outStream->open_write(filetype);
241
242 return outStream;
243 }
244
245 CdoStreamID
cdo_open_write(const std::string & p_filename,int filetype)246 cdo_open_write(const std::string &p_filename, int filetype)
247 {
248 if (filetype == CDI_UNDEFID) filetype = cdo_filetype();
249
250 localProcess->add_file_out_stream(p_filename);
251
252 const auto pstreamID = localProcess->outputStreams.back()->open_write(filetype);
253 if (pstreamID == -1) cdo_abort("Could not create pstream for file: %s", p_filename);
254
255 return localProcess->outputStreams.back();
256 }
257
258 CdoStreamID
cdo_open_append(int p_outFileIndex)259 cdo_open_append(int p_outFileIndex)
260 {
261 const auto streamIndex = p_outFileIndex - localProcess->inputStreams.size();
262 const auto outStream = localProcess->outputStreams[streamIndex];
263
264 const auto fileID = outStream->open_append();
265 if (fileID < 0) cdi_open_error(fileID, "Open failed on >%s<", outStream->m_name.c_str());
266
267 return outStream;
268 }
269
270 int
cdoStreamOpenAppend(int p_outFileIndex)271 cdoStreamOpenAppend(int p_outFileIndex)
272 {
273 return cdo_open_append(p_outFileIndex)->get_id();
274 }
275
276 /**
277 * Returns the output stream name as std::string for \p outStreamID.
278 */
279 static const char *
cdoGetOutStreamName()280 cdoGetOutStreamName()
281 {
282 return localProcess->get_out_stream_name();
283 }
284
285 /**
286 * Returns the input stream name as std::string for inStreamID.
287 */
288
289 const std::shared_ptr<Process>
cdoGetInputChild(int p_inID)290 cdoGetInputChild(int p_inID)
291 {
292 for (const std::shared_ptr<Process>&processPtr : localProcess->childProcesses)
293 {
294 if (processPtr->has_out_stream(localProcess->inputStreams[p_inID])) return processPtr;
295 }
296 return nullptr;
297 }
298
299 static const char *
cdoGetInStreamName(int p_inStream)300 cdoGetInStreamName(int p_inStream)
301 {
302 return localProcess->inputStreams[p_inStream]->m_name.c_str();
303 }
304
305 const char *
cdo_get_stream_name(int p_streamIndex)306 cdo_get_stream_name(int p_streamIndex)
307 {
308 const char *streamName = nullptr;
309 Debug(PROCESS, "stridx %d", p_streamIndex);
310 if (p_streamIndex >= static_cast<int>(localProcess->inputStreams.size()))
311 {
312 Debug(PROCESS, "Getting output stream name %d", p_streamIndex);
313 streamName = cdoGetOutStreamName();
314 }
315 else
316 {
317 Debug(PROCESS, "Getting input stream name %d", p_streamIndex);
318 streamName = cdoGetInStreamName(p_streamIndex);
319 }
320
321 Debug(PROCESS, "StreamName is: %s", streamName);
322
323 return streamName;
324 }
325
326 std::string
cdo_get_command_from_in_stream(int p_streamIndex)327 cdo_get_command_from_in_stream(int p_streamIndex)
328 {
329 const std::shared_ptr<Process> process = cdoGetInputChild(p_streamIndex);
330 if (!process) // then this should be a file and cdo_get_stream_name can handle this
331 {
332 return cdo_get_stream_name(p_streamIndex);
333 }
334 return process->operatorName;
335 }
336
337 bool
cdo_assert_files_only()338 cdo_assert_files_only()
339 {
340 return localProcess->has_no_pipes();
341 }
342
343 std::string
cdo_get_obase()344 cdo_get_obase()
345 {
346 return localProcess->get_obase();
347 }
348
349 void
cdo_initialize(void * p_process)350 cdo_initialize(void *p_process)
351 {
352 #if defined(_OPENMP)
353 omp_set_num_threads(Threading::ompNumThreads); // Has to be called for every module (pthread)!
354 #endif
355 localProcess = (Process *) p_process;
356 #ifdef HAVE_LIBPTHREAD
357 localProcess->threadID = pthread_self();
358 #endif
359 Debug(PROCESS, "Initializing process: %s (id: %d)", localProcess->operatorName , localProcess->get_id());
360
361 #ifdef HAVE_LIBPTHREAD
362 Debug(PROCESS, "process %d thread %ld", localProcess->m_ID, pthread_self());
363 #endif
364 }
365
366 const char *
process_inq_prompt(void)367 process_inq_prompt(void)
368 {
369 return localProcess ? localProcess->inq_promt() : cdo::progname;
370 }
371
372 extern "C" size_t getPeakRSS();
373
374 void
cdo_finish(void)375 cdo_finish(void)
376 {
377 Debug(PROCESS, "Finishing process: %d", localProcess->m_ID);
378
379 #ifdef HAVE_LIBPTHREAD
380 Debug(PROCESS, "process %d thread %ld", localProcess->m_ID, pthread_self());
381 #endif
382 if (!Options::silentMode) localProcess->print_processed_values();
383
384 localProcess->set_inactive();
385 NumProcessActive--;
386 }
387
388 /* TODO move cdoClose internals into Pstream::close/ CdoStream::close */
389 void
cdo_stream_close(CdoStreamID pstreamPtr)390 cdo_stream_close(CdoStreamID pstreamPtr)
391 {
392 Debug(PROCESS, "Adding %d to pstream %d %s", localProcess->inq_nvals(), pstreamPtr->get_id(), pstreamPtr->m_name);
393 pstreamPtr->close();
394 }
395
396 /******************************************************************/
397 /* Functions that only work on streamID's and do not need process */
398 /******************************************************************/
399
400 int
cdo_stream_inq_vlist(CdoStreamID p_pstreamPtr)401 cdo_stream_inq_vlist(CdoStreamID p_pstreamPtr)
402 {
403 if (p_pstreamPtr == nullptr) return -1;
404
405 Debug(PROCESS, "Inquiring Vlist from pstream %d", p_pstreamPtr->get_id());
406 const auto vlistID = p_pstreamPtr->inq_vlist();
407 if (vlistNumber(vlistID) == CDI_COMP && cdo_stream_number() == CDI_REAL)
408 cdo_abort("Fields with complex numbers are not supported by this operator!");
409
410 if (vlistNumber(vlistID) == CDI_REAL && cdo_stream_number() == CDI_COMP)
411 cdo_abort("This operator needs fields with complex numbers!");
412
413 process_def_var_num(vlistNvars(vlistID));
414
415 return vlistID;
416 }
417
418 // - - - - - - -
419 void
cdo_write_record_f(CdoStreamID p_pstreamPtr,float * data,size_t nmiss)420 cdo_write_record_f(CdoStreamID p_pstreamPtr, float *data, size_t nmiss)
421 {
422 p_pstreamPtr->write_record(data, nmiss);
423 }
424
425 void
cdo_write_record(CdoStreamID p_pstreamPtr,double * data,size_t nmiss)426 cdo_write_record(CdoStreamID p_pstreamPtr, double *data, size_t nmiss)
427 {
428 p_pstreamPtr->write_record(data, nmiss);
429 }
430
431 void
cdo_write_record(CdoStreamID p_pstreamPtr,Field & field)432 cdo_write_record(CdoStreamID p_pstreamPtr, Field &field)
433 {
434 if (field.memType == MemType::Float)
435 cdo_write_record_f(p_pstreamPtr, field.vec_f.data(), field.nmiss);
436 else
437 cdo_write_record(p_pstreamPtr, field.vec_d.data(), field.nmiss);
438 }
439
440 void
cdo_write_record(CdoStreamID p_pstreamPtr,Field3D & field,int levelID,size_t nmiss)441 cdo_write_record(CdoStreamID p_pstreamPtr, Field3D &field, int levelID, size_t nmiss)
442 {
443 const auto offset = levelID * field.gridsize * field.nwpv;
444 if (field.memType == MemType::Float)
445 cdo_write_record_f(p_pstreamPtr, field.vec_f.data() + offset, nmiss);
446 else
447 cdo_write_record(p_pstreamPtr, field.vec_d.data() + offset, nmiss);
448 }
449 // - - - - - - -
450 void
cdo_def_vlist(CdoStreamID p_pstreamPtr,int vlistID)451 cdo_def_vlist(CdoStreamID p_pstreamPtr, int vlistID)
452 {
453 p_pstreamPtr->def_vlist(vlistID);
454 }
455 // - - - - - - -
456 void
cdo_def_timestep(CdoStreamID p_pstreamPtr,int tsID)457 cdo_def_timestep(CdoStreamID p_pstreamPtr, int tsID)
458 {
459 p_pstreamPtr->def_timestep(tsID);
460 }
461 // - - - - - - -
462 //
463 int
cdo_inq_filetype(CdoStreamID p_pstreamPtr)464 cdo_inq_filetype(CdoStreamID p_pstreamPtr)
465 {
466 return p_pstreamPtr->inqFileType();
467 }
468 // - - - - - - -
469 void
cdo_inq_grib_info(CdoStreamID p_streamPtr,int * intnum,float * fltnum,off_t * bignum)470 cdo_inq_grib_info(CdoStreamID p_streamPtr, int *intnum, float *fltnum, off_t *bignum)
471 {
472 streamInqGRIBinfo(p_streamPtr->m_fileID, intnum, fltnum, bignum);
473 }
474
475 // - - - - - - -
476 int
cdo_inq_byteorder(CdoStreamID p_pstreamPtr)477 cdo_inq_byteorder(CdoStreamID p_pstreamPtr)
478 {
479 return p_pstreamPtr->inqByteorder();
480 }
481
482 // - - - - - - -
483 void
cdo_read_record_f(CdoStreamID p_pstreamPtr,float * data,size_t * nmiss)484 cdo_read_record_f(CdoStreamID p_pstreamPtr, float *data, size_t *nmiss)
485 {
486 if (data == nullptr) cdo_abort("Data pointer not allocated (cdo_read_record)!");
487 p_pstreamPtr->read_record(data, nmiss);
488 }
489
490 void
cdo_read_record(CdoStreamID p_pstreamPtr,double * data,size_t * nmiss)491 cdo_read_record(CdoStreamID p_pstreamPtr, double *data, size_t *nmiss)
492 {
493 if (data == nullptr) cdo_abort("Data pointer not allocated (cdo_read_record)!");
494 p_pstreamPtr->read_record(data, nmiss);
495 }
496
497 void
cdo_read_record(CdoStreamID streamID,Field & field)498 cdo_read_record(CdoStreamID streamID, Field &field)
499 {
500 if (field.memType == MemType::Float)
501 cdo_read_record_f(streamID, field.vec_f.data(), &field.nmiss);
502 else
503 cdo_read_record(streamID, field.vec_d.data(), &field.nmiss);
504 }
505
506 void
cdo_read_record(CdoStreamID streamID,Field3D & field,int levelID,size_t * nmiss)507 cdo_read_record(CdoStreamID streamID, Field3D &field, int levelID, size_t *nmiss)
508 {
509 const auto offset = levelID * field.gridsize * field.nwpv;
510 if (field.memType == MemType::Float)
511 cdo_read_record_f(streamID, field.vec_f.data() + offset, nmiss);
512 else
513 cdo_read_record(streamID, field.vec_d.data() + offset, nmiss);
514 }
515 // - - - - - - -
516
517 void
cdo_copy_record(CdoStreamID pstreamPtrDest,CdoStreamID pstreamPtrSrc)518 cdo_copy_record(CdoStreamID pstreamPtrDest, CdoStreamID pstreamPtrSrc)
519 {
520 Debug(PROCESS, "pstreamIDdest = %d pstreamIDsrc = %d", pstreamPtrDest->get_id(), pstreamPtrSrc->get_id());
521 pstreamPtrDest->copyRecord(pstreamPtrSrc);
522 }
523
524 // - - - - - - -
525
526 void
cdo_inq_record(CdoStreamID pstreamptr,int * varID,int * levelID)527 cdo_inq_record(CdoStreamID pstreamptr, int *varID, int *levelID)
528 {
529 pstreamptr->inq_record(varID, levelID);
530 }
531
532 void
cdo_def_record(CdoStreamID pstreamptr,int varID,int levelID)533 cdo_def_record(CdoStreamID pstreamptr, int varID, int levelID)
534 {
535 pstreamptr->defRecord(varID, levelID);
536 }
537 // - - - - - - -
538
539 void
cdo_def_comp_type(CdoStreamID p_streamID,int p_cdi_compression_type)540 cdo_def_comp_type(CdoStreamID p_streamID, int p_cdi_compression_type)
541 {
542 streamDefCompType(p_streamID->m_fileID, p_cdi_compression_type);
543 }
544
545 bool
data_is_unchanged()546 data_is_unchanged()
547 {
548 const auto unchanged
549 = (localProcess->m_ID == 0 && localProcess->has_no_pipes() && Options::cdoRegulargrid == false && CdoDefault::FileType == -1
550 && CdoDefault::DataType == -1 && CdoDefault::Byteorder == -1 && Options::CDO_Memtype == MemType::Native);
551 return unchanged;
552 }
553
554 void
cdo_set_nan(double missval,size_t gridsize,double * array)555 cdo_set_nan(double missval, size_t gridsize, double *array)
556 {
557 if (std::isnan(missval))
558 {
559 constexpr double newmissval = -9.e33;
560 for (size_t i = 0; i < gridsize; ++i)
561 if (DBL_IS_EQUAL(array[i], missval)) array[i] = newmissval;
562 }
563 }
564