1 /*
2 This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3
4 Author: Uwe Schulzweida
5
6 */
7
8 /*
9 This module contains the following operators:
10
11 Timstat timmin Time minimum
12 Timstat timmax Time maximum
13 Timstat timsum Time sum
14 Timstat timmean Time mean
15 Timstat timavg Time average
16 Timstat timvar Time variance
17 Timstat timvar1 Time variance [Normalize by (n-1)]
18 Timstat timstd Time standard deviation
19 Timstat timstd1 Time standard deviation [Normalize by (n-1)]
20 Hourstat hourmin Hourly minimum
21 Hourstat hourmax Hourly maximum
22 Hourstat hoursum Hourly sum
23 Hourstat hourmean Hourly mean
24 Hourstat houravg Hourly average
25 Hourstat hourvar Hourly variance
26 Hourstat hourvar1 Hourly variance [Normalize by (n-1)]
27 Hourstat hourstd Hourly standard deviation
28 Hourstat hourstd1 Hourly standard deviation [Normalize by (n-1)]
29 Daystat daymin Daily minimum
30 Daystat daymax Daily maximum
31 Daystat daysum Daily sum
32 Daystat daymean Daily mean
33 Daystat dayavg Daily average
34 Daystat dayvar Daily variance
35 Daystat dayvar1 Daily variance [Normalize by (n-1)]
36 Daystat daystd Daily standard deviation
37 Daystat daystd1 Daily standard deviation [Normalize by (n-1)]
38 Monstat monmin Monthly minimum
39 Monstat monmax Monthly maximum
40 Monstat monsum Monthly sum
41 Monstat monmean Monthly mean
42 Monstat monavg Monthly average
43 Monstat monvar Monthly variance
44 Monstat monvar1 Monthly variance [Normalize by (n-1)]
45 Monstat monstd Monthly standard deviation
46 Monstat monstd1 Monthly standard deviation [Normalize by (n-1)]
47 Yearstat yearmin Yearly minimum
48 Yearstat yearmax Yearly maximum
49 Yearstat yearsum Yearly sum
50 Yearstat yearmean Yearly mean
51 Yearstat yearavg Yearly average
52 Yearstat yearvar Yearly variance
53 Yearstat yearvar1 Yearly variance [Normalize by (n-1)]
54 Yearstat yearstd Yearly standard deviation
55 Yearstat yearstd1 Yearly standard deviation [Normalize by (n-1)]
56 */
57
58 #include <cdi.h>
59
60 #include <utility>
61
62 #include "cdo_options.h"
63 #include "cdo_vlist.h"
64 #include "process_int.h"
65 #include "cdo_task.h"
66 #include "timer.h"
67 #include "datetime.h"
68 #include "printinfo.h"
69 #include "util_date.h"
70 #include "param_conversion.h"
71
72 #define USE_CDI_STREAM 1
73
74 enum
75 {
76 HOUR_LEN = 4,
77 DAY_LEN = 6,
78 MON_LEN = 8,
79 YEAR_LEN = 10
80 };
81
82 struct ReadArguments
83 {
84 int tsIDnext;
85 #ifdef USE_CDI_STREAM
86 int streamID;
87 #else
88 CdoStreamID streamID;
89 #endif
90 int nrecs;
91 RecordInfo *recList;
92 FieldVector2D &vars;
93
ReadArgumentsReadArguments94 ReadArguments(FieldVector2D &input_vars) : tsIDnext(0), nrecs(0), recList(nullptr), vars(input_vars){};
95 };
96
97 static int num_recs = 0;
98
99 static void *
cdoReadTimestep(void * rarg)100 cdoReadTimestep(void *rarg)
101 {
102 ReadArguments *readarg = (ReadArguments *) rarg;
103 auto &input_vars = readarg->vars;
104 RecordInfo *recList = readarg->recList;
105 const auto streamID = readarg->streamID;
106 int tsIDnext = readarg->tsIDnext;
107 int nrecs = readarg->nrecs;
108
109 // timer_start(timer_read);
110
111 for (int recID = 0; recID < nrecs; ++recID)
112 {
113 int varID, levelID;
114 #ifdef USE_CDI_STREAM
115 streamInqRecord(streamID, &varID, &levelID);
116 #else
117 cdo_inq_record(streamID, &varID, &levelID);
118 #endif
119
120 if (tsIDnext == 1 && recList)
121 {
122 recList[recID].varID = varID;
123 recList[recID].levelID = levelID;
124 }
125
126 size_t nmiss;
127
128 #ifdef USE_CDI_STREAM
129 if (Options::CDO_Memtype == MemType::Float)
130 streamReadRecordF(streamID, input_vars[varID][levelID].vec_f.data(), &nmiss);
131 else
132 streamReadRecord(streamID, input_vars[varID][levelID].vec_d.data(), &nmiss);
133 #else
134 if (Options::CDO_Memtype == MemType::Float)
135 cdo_read_record_f(streamID, input_vars[varID][levelID].vec_f.data(), &nmiss);
136 else
137 cdo_read_record(streamID, input_vars[varID][levelID].vec_d.data(), &nmiss);
138 #endif
139 input_vars[varID][levelID].nmiss = nmiss;
140 }
141
142 // timer_stop(timer_read);
143
144 #ifdef USE_CDI_STREAM
145 num_recs = streamInqTimestep(streamID, tsIDnext);
146 #else
147 num_recs = cdo_stream_inq_timestep(streamID, tsIDnext);
148 #endif
149
150 return ((void *) &num_recs);
151 }
152
153 static void
vlistSetFrequency(int vlistID,int comparelen)154 vlistSetFrequency(int vlistID, int comparelen)
155 {
156 const char *freq = nullptr;
157 // clang-format off
158 if (comparelen == DAY_LEN) freq = "day";
159 else if (comparelen == MON_LEN) freq = "mon";
160 else if (comparelen == YEAR_LEN) freq = "year";
161 // clang-format on
162 if (freq) cdiDefAttTxt(vlistID, CDI_GLOBAL, "frequency", (int) strlen(freq), freq);
163 }
164
165 static void
addOperators(void)166 addOperators(void)
167 {
168 // clang-format off
169 cdo_operator_add("xtimmin", FieldFunc_Min, DATE_LEN, nullptr);
170 cdo_operator_add("xtimmax", FieldFunc_Max, DATE_LEN, nullptr);
171 cdo_operator_add("xtimsum", FieldFunc_Sum, DATE_LEN, nullptr);
172 cdo_operator_add("xtimmean", FieldFunc_Mean, DATE_LEN, nullptr);
173 cdo_operator_add("xtimavg", FieldFunc_Avg, DATE_LEN, nullptr);
174 cdo_operator_add("xtimvar", FieldFunc_Var, DATE_LEN, nullptr);
175 cdo_operator_add("xtimvar1", FieldFunc_Var1, DATE_LEN, nullptr);
176 cdo_operator_add("xtimstd", FieldFunc_Std, DATE_LEN, nullptr);
177 cdo_operator_add("xtimstd1", FieldFunc_Std1, DATE_LEN, nullptr);
178 cdo_operator_add("xyearmin", FieldFunc_Min, YEAR_LEN, nullptr);
179 cdo_operator_add("xyearmax", FieldFunc_Max, YEAR_LEN, nullptr);
180 cdo_operator_add("xyearsum", FieldFunc_Sum, YEAR_LEN, nullptr);
181 cdo_operator_add("xyearmean", FieldFunc_Mean, YEAR_LEN, nullptr);
182 cdo_operator_add("xyearavg", FieldFunc_Avg, YEAR_LEN, nullptr);
183 cdo_operator_add("xyearvar", FieldFunc_Var, YEAR_LEN, nullptr);
184 cdo_operator_add("xyearvar1", FieldFunc_Var1, YEAR_LEN, nullptr);
185 cdo_operator_add("xyearstd", FieldFunc_Std, YEAR_LEN, nullptr);
186 cdo_operator_add("xyearstd1", FieldFunc_Std1, YEAR_LEN, nullptr);
187 cdo_operator_add("xmonmin", FieldFunc_Min, MON_LEN, nullptr);
188 cdo_operator_add("xmonmax", FieldFunc_Max, MON_LEN, nullptr);
189 cdo_operator_add("xmonsum", FieldFunc_Sum, MON_LEN, nullptr);
190 cdo_operator_add("xmonmean", FieldFunc_Mean, MON_LEN, nullptr);
191 cdo_operator_add("xmonavg", FieldFunc_Avg, MON_LEN, nullptr);
192 cdo_operator_add("xmonvar", FieldFunc_Var, MON_LEN, nullptr);
193 cdo_operator_add("xmonvar1", FieldFunc_Var1, MON_LEN, nullptr);
194 cdo_operator_add("xmonstd", FieldFunc_Std, MON_LEN, nullptr);
195 cdo_operator_add("xmonstd1", FieldFunc_Std1, MON_LEN, nullptr);
196 // clang-format on
197 }
198
199 void *
XTimstat(void * process)200 XTimstat(void *process)
201 {
202 TimeStat timestat_date = TimeStat::MEAN;
203 int64_t vdate = 0, vdate0 = 0;
204 int vtime = 0, vtime0 = 0;
205 CdoStreamID streamID3 = CDO_STREAM_UNDEF;
206 int vlistID3, taxisID3 = -1;
207 bool lvfrac = false;
208 char indate1[DATE_LEN + 1], indate2[DATE_LEN + 1];
209 double vfrac = 1;
210
211 cdo_initialize(process);
212
213 addOperators();
214
215 const auto operatorID = cdo_operator_id();
216 auto operfunc = cdo_operator_f1(operatorID);
217 const auto comparelen = cdo_operator_f2(operatorID);
218
219 const auto lmean = (operfunc == FieldFunc_Mean || operfunc == FieldFunc_Avg);
220 const auto lstd = (operfunc == FieldFunc_Std || operfunc == FieldFunc_Std1);
221 auto lvarstd = (lstd || operfunc == FieldFunc_Var || operfunc == FieldFunc_Var1);
222 const auto lvars2 = lvarstd;
223 int divisor = (operfunc == FieldFunc_Std1 || operfunc == FieldFunc_Var1);
224
225 auto field2_stdvar_func = lstd ? field2_std : field2_var;
226 auto fieldc_stdvar_func = lstd ? fieldc_std : fieldc_var;
227
228 if (operfunc == FieldFunc_Mean)
229 {
230 const auto oargc = cdo_operator_argc();
231 if (oargc == 1)
232 {
233 lvfrac = true;
234 vfrac = parameter_to_double(cdo_operator_argv(0));
235 if (Options::cdoVerbose) cdo_print("Set vfrac to %g", vfrac);
236 if (vfrac < 0 || vfrac > 1) cdo_abort("vfrac out of range!");
237 }
238 else if (oargc > 1)
239 cdo_abort("Too many arguments!");
240 }
241 else
242 {
243 operator_check_argc(0);
244 }
245
246 const int cmplen = DATE_LEN - comparelen;
247
248 #ifdef USE_CDI_STREAM
249 const auto streamID1 = streamOpenRead(cdo_get_stream_name(0));
250 auto vlistID1 = streamInqVlist(streamID1);
251 #else
252 const auto streamID1 = cdo_open_read(0);
253 auto vlistID1 = cdo_stream_inq_vlist(streamID1);
254 #endif
255
256 const auto vlistID2 = vlistDuplicate(vlistID1);
257 vlist_define_timestep_type(vlistID2, operfunc);
258
259 if (cmplen == 0) vlistDefNtsteps(vlistID2, 1);
260
261 const auto taxisID1 = vlistInqTaxis(vlistID1);
262 const auto taxisID2 = taxisDuplicate(taxisID1);
263 taxisWithBounds(taxisID2);
264 if (taxisInqType(taxisID2) == TAXIS_FORECAST) taxisDefType(taxisID2, TAXIS_RELATIVE);
265 vlistDefTaxis(vlistID2, taxisID2);
266
267 const auto nvars = vlistNvars(vlistID1);
268
269 vlistSetFrequency(vlistID2, comparelen);
270
271 const auto streamID2 = cdo_open_write(1);
272 cdo_def_vlist(streamID2, vlistID2);
273
274 if (Options::cdoDiag)
275 {
276 char filename[8192];
277 strcpy(filename, cdo_operator_name(operatorID));
278 strcat(filename, "_");
279 strcat(filename, cdo_get_stream_name(1));
280 streamID3 = cdo_open_write(filename);
281
282 vlistID3 = vlistDuplicate(vlistID1);
283
284 for (int varID = 0; varID < nvars; ++varID)
285 {
286 vlistDefVarDatatype(vlistID3, varID, CDI_DATATYPE_INT32);
287 vlistDefVarMissval(vlistID3, varID, -1);
288 cdiDefKeyString(vlistID3, varID, CDI_KEY_UNITS, "");
289 vlistDefVarAddoffset(vlistID3, varID, 0);
290 vlistDefVarScalefactor(vlistID3, varID, 1);
291 }
292
293 taxisID3 = taxisDuplicate(taxisID1);
294 taxisWithBounds(taxisID3);
295 vlistDefTaxis(vlistID3, taxisID3);
296
297 cdo_def_vlist(streamID3, vlistID3);
298 }
299
300 DateTimeList dtlist;
301 dtlist.set_stat(timestat_date);
302 dtlist.set_calendar(taxisInqCalendar(taxisID1));
303
304 int FIELD_MEMTYPE = 0;
305 if (operfunc == FieldFunc_Mean && Options::CDO_Memtype == MemType::Float) FIELD_MEMTYPE = FIELD_FLT;
306 FieldVector3D input_vars(2);
307 fields_from_vlist(vlistID1, input_vars[0], FIELD_VEC | FIELD_MEMTYPE);
308 fields_from_vlist(vlistID1, input_vars[1], FIELD_VEC | FIELD_MEMTYPE);
309 FieldVector2D samp1, vars1, vars2;
310 fields_from_vlist(vlistID1, samp1);
311 fields_from_vlist(vlistID1, vars1, FIELD_VEC);
312 if (lvars2) fields_from_vlist(vlistID1, vars2, FIELD_VEC);
313
314 int curFirst = 0;
315 int curSecond = 1;
316 (void) curSecond;
317
318 ReadArguments readarg(input_vars[curFirst]);
319 readarg.streamID = streamID1;
320
321 bool lparallelread = Options::CDO_Parallel_Read > 0;
322 bool ltsfirst = true;
323 cdo::Task *read_task = nullptr;
324 void *readresult = nullptr;
325
326 if (lparallelread) read_task = new cdo::Task;
327
328 int tsID = 0;
329 int otsID = 0;
330 #ifdef USE_CDI_STREAM
331 auto nrecs = streamInqTimestep(streamID1, tsID);
332 #else
333 auto nrecs = cdo_stream_inq_timestep(streamID1, tsID);
334 #endif
335 int maxrecs = nrecs;
336 std::vector<RecordInfo> recList(maxrecs);
337
338 tsID++;
339 while (true)
340 {
341 int nsets = 0;
342 while (nrecs > 0)
343 {
344 dtlist.taxis_inq_timestep(taxisID1, nsets);
345 vdate = dtlist.get_vdate(nsets);
346 vtime = dtlist.get_vtime(nsets);
347
348 if (nsets == 0) SET_DATE(indate2, vdate, vtime);
349 SET_DATE(indate1, vdate, vtime);
350
351 if (DATE_IS_NEQ(indate1, indate2, cmplen)) break;
352
353 readarg.tsIDnext = tsID;
354 readarg.nrecs = nrecs;
355 readarg.recList = recList.data();
356 readarg.vars = input_vars[curFirst];
357
358 if (ltsfirst || !lparallelread)
359 {
360 ltsfirst = false;
361 readresult = cdoReadTimestep(&readarg);
362 }
363 else
364 {
365 readresult = read_task->wait();
366 }
367
368 nrecs = *(int *) readresult;
369
370 // std::swap(curFirst, curSecond);
371
372 if (nrecs && lparallelread)
373 {
374 readarg.vars = input_vars[curFirst];
375 readarg.tsIDnext = tsID + 1;
376 read_task->start(cdoReadTimestep, &readarg);
377 }
378
379 if (nsets == 0)
380 {
381 #ifdef _OPENMP
382 #pragma omp parallel for default(none) shared(maxrecs, recList, curFirst, input_vars, vars1, samp1) if (maxrecs > 1)
383 #endif
384 for (int recID = 0; recID < maxrecs; recID++)
385 {
386 const auto varID = recList[recID].varID;
387 const auto levelID = recList[recID].levelID;
388
389 auto &rsamp1 = samp1[varID][levelID];
390 auto &rvars1 = vars1[varID][levelID];
391 auto &rinput_var = input_vars[curFirst][varID][levelID];
392
393 const auto nmiss = rinput_var.nmiss;
394
395 field_copy(rinput_var, rvars1);
396 rvars1.nmiss = nmiss;
397 if (nmiss || !rsamp1.empty())
398 {
399 const auto fieldsize = rvars1.size;
400 if (rsamp1.empty()) rsamp1.resize(fieldsize);
401
402 for (size_t i = 0; i < fieldsize; i++) rsamp1.vec_d[i] = !DBL_IS_EQUAL(rvars1.vec_d[i], rvars1.missval);
403 }
404 }
405 }
406 else
407 {
408 #ifdef _OPENMP
409 #pragma omp parallel for default(none) \
410 shared(lvarstd, nsets, maxrecs, recList, curFirst, input_vars, vars1, samp1, vars2, operfunc) if (maxrecs > 1)
411 #endif
412 for (int recID = 0; recID < maxrecs; recID++)
413 {
414 const auto varID = recList[recID].varID;
415 const auto levelID = recList[recID].levelID;
416
417 auto &rsamp1 = samp1[varID][levelID];
418 auto &rvars1 = vars1[varID][levelID];
419 auto &rinput_var = input_vars[curFirst][varID][levelID];
420
421 const auto nmiss = rinput_var.nmiss;
422
423 if (nmiss || !rsamp1.empty())
424 {
425 const auto fieldsize = rvars1.size;
426 if (rsamp1.empty()) rsamp1.resize(fieldsize, nsets);
427
428 for (size_t i = 0; i < fieldsize; i++)
429 if (!DBL_IS_EQUAL(rinput_var.vec_d[i], rvars1.missval)) rsamp1.vec_d[i]++;
430 }
431
432 // clang-format off
433 if (lvarstd) field2_sumsumq(rvars1, vars2[varID][levelID], rinput_var);
434 else field2_function(rvars1, rinput_var, operfunc);
435 // clang-format on
436 }
437 }
438
439 if (nsets == 0 && lvarstd)
440 for (int recID = 0; recID < maxrecs; recID++)
441 {
442 const auto varID = recList[recID].varID;
443 const auto levelID = recList[recID].levelID;
444
445 if (vlistInqVarTimetype(vlistID1, varID) == TIME_CONSTANT) continue;
446
447 field2_moq(vars2[varID][levelID], vars1[varID][levelID]);
448 }
449
450 vdate0 = vdate;
451 vtime0 = vtime;
452 nsets++;
453 tsID++;
454 }
455
456 if (nrecs == 0 && nsets == 0) break;
457
458 if (lmean)
459 {
460 #ifdef _OPENMP
461 #pragma omp parallel for default(none) shared(vlistID1, nsets, maxrecs, recList, vars1, samp1) if (maxrecs > 1)
462 #endif
463 for (int recID = 0; recID < maxrecs; recID++)
464 {
465 const auto varID = recList[recID].varID;
466 const auto levelID = recList[recID].levelID;
467
468 if (vlistInqVarTimetype(vlistID1, varID) == TIME_CONSTANT) continue;
469
470 if (!samp1[varID][levelID].empty())
471 field2_div(vars1[varID][levelID], samp1[varID][levelID]);
472 else
473 fieldc_div(vars1[varID][levelID], (double) nsets);
474 }
475 }
476 else if (lvarstd)
477 {
478 for (int recID = 0; recID < maxrecs; recID++)
479 {
480 const auto varID = recList[recID].varID;
481 const auto levelID = recList[recID].levelID;
482
483 if (vlistInqVarTimetype(vlistID1, varID) == TIME_CONSTANT) continue;
484
485 const auto &rsamp1 = samp1[varID][levelID];
486 auto &rvars1 = vars1[varID][levelID];
487 const auto &rvars2 = vars2[varID][levelID];
488
489 if (!rsamp1.empty())
490 field2_stdvar_func(rvars1, rvars2, rsamp1, divisor);
491 else
492 fieldc_stdvar_func(rvars1, rvars2, nsets, divisor);
493 }
494 }
495
496 if (Options::cdoVerbose)
497 cdo_print("%s %s vfrac = %g, nsets = %d", date_to_string(vdate0).c_str(), time_to_string(vtime0).c_str(), vfrac, nsets);
498
499 if (lvfrac && operfunc == FieldFunc_Mean)
500 for (int recID = 0; recID < maxrecs; recID++)
501 {
502 const auto varID = recList[recID].varID;
503 const auto levelID = recList[recID].levelID;
504 auto &rvars1 = vars1[varID][levelID];
505
506 if (vlistInqVarTimetype(vlistID1, varID) == TIME_CONSTANT) continue;
507
508 const auto missval = rvars1.missval;
509 if (!samp1[varID][levelID].empty())
510 {
511 const auto fieldsize = rvars1.size;
512 size_t irun = 0;
513 for (size_t i = 0; i < fieldsize; ++i)
514 {
515 if ((samp1[varID][levelID].vec_d[i] / nsets) < vfrac)
516 {
517 rvars1.vec_d[i] = missval;
518 irun++;
519 }
520 }
521
522 if (irun) rvars1.nmiss = field_num_miss(rvars1);
523 }
524 }
525
526 dtlist.stat_taxis_def_timestep(taxisID2, nsets);
527 cdo_def_timestep(streamID2, otsID);
528
529 if (Options::cdoDiag)
530 {
531 dtlist.stat_taxis_def_timestep(taxisID3, nsets);
532 cdo_def_timestep(streamID3, otsID);
533 }
534
535 for (int recID = 0; recID < maxrecs; recID++)
536 {
537 const auto varID = recList[recID].varID;
538 const auto levelID = recList[recID].levelID;
539 auto &rvars1 = vars1[varID][levelID];
540
541 if (otsID && vlistInqVarTimetype(vlistID1, varID) == TIME_CONSTANT) continue;
542
543 cdo_def_record(streamID2, varID, levelID);
544 cdo_write_record(streamID2, rvars1.vec_d.data(), rvars1.nmiss);
545
546 if (Options::cdoDiag)
547 {
548 if (!samp1[varID][levelID].empty())
549 {
550 cdo_def_record(streamID3, varID, levelID);
551 cdo_write_record(streamID3, samp1[varID][levelID].vec_d.data(), 0);
552 }
553 }
554 }
555
556 if (nrecs == 0) break;
557 otsID++;
558 }
559
560 if (Options::cdoDiag) cdo_stream_close(streamID3);
561 cdo_stream_close(streamID2);
562 #ifdef USE_CDI_STREAM
563 streamClose(streamID1);
564 #else
565 cdo_stream_close(streamID1);
566 #endif
567
568 if (read_task) delete read_task;
569
570 cdo_finish();
571
572 return nullptr;
573 }
574