1 /*
2   This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3 
4   Author: Uwe Schulzweida
5 
6 */
7 
8 /*
9    This module contains the following operators:
10 
11       Timstat    timmin          Time minimum
12       Timstat    timmax          Time maximum
13       Timstat    timsum          Time sum
14       Timstat    timmean         Time mean
15       Timstat    timavg          Time average
16       Timstat    timvar          Time variance
17       Timstat    timvar1         Time variance [Normalize by (n-1)]
18       Timstat    timstd          Time standard deviation
19       Timstat    timstd1         Time standard deviation [Normalize by (n-1)]
20       Hourstat   hourmin         Hourly minimum
21       Hourstat   hourmax         Hourly maximum
22       Hourstat   hoursum         Hourly sum
23       Hourstat   hourmean        Hourly mean
24       Hourstat   houravg         Hourly average
25       Hourstat   hourvar         Hourly variance
26       Hourstat   hourvar1        Hourly variance [Normalize by (n-1)]
27       Hourstat   hourstd         Hourly standard deviation
28       Hourstat   hourstd1        Hourly standard deviation [Normalize by (n-1)]
29       Daystat    daymin          Daily minimum
30       Daystat    daymax          Daily maximum
31       Daystat    daysum          Daily sum
32       Daystat    daymean         Daily mean
33       Daystat    dayavg          Daily average
34       Daystat    dayvar          Daily variance
35       Daystat    dayvar1         Daily variance [Normalize by (n-1)]
36       Daystat    daystd          Daily standard deviation
37       Daystat    daystd1         Daily standard deviation [Normalize by (n-1)]
38       Monstat    monmin          Monthly minimum
39       Monstat    monmax          Monthly maximum
40       Monstat    monsum          Monthly sum
41       Monstat    monmean         Monthly mean
42       Monstat    monavg          Monthly average
43       Monstat    monvar          Monthly variance
44       Monstat    monvar1         Monthly variance [Normalize by (n-1)]
45       Monstat    monstd          Monthly standard deviation
46       Monstat    monstd1         Monthly standard deviation [Normalize by (n-1)]
47       Yearstat   yearmin         Yearly minimum
48       Yearstat   yearmax         Yearly maximum
49       Yearstat   yearsum         Yearly sum
50       Yearstat   yearmean        Yearly mean
51       Yearstat   yearavg         Yearly average
52       Yearstat   yearvar         Yearly variance
53       Yearstat   yearvar1        Yearly variance [Normalize by (n-1)]
54       Yearstat   yearstd         Yearly standard deviation
55       Yearstat   yearstd1        Yearly standard deviation [Normalize by (n-1)]
56 */
57 
58 #include <cdi.h>
59 
60 #include <utility>
61 
62 #include "cdo_options.h"
63 #include "cdo_vlist.h"
64 #include "process_int.h"
65 #include "cdo_task.h"
66 #include "timer.h"
67 #include "datetime.h"
68 #include "printinfo.h"
69 #include "util_date.h"
70 #include "param_conversion.h"
71 
72 #define USE_CDI_STREAM 1
73 
74 enum
75 {
76   HOUR_LEN = 4,
77   DAY_LEN = 6,
78   MON_LEN = 8,
79   YEAR_LEN = 10
80 };
81 
82 struct ReadArguments
83 {
84   int tsIDnext;
85 #ifdef USE_CDI_STREAM
86   int streamID;
87 #else
88   CdoStreamID streamID;
89 #endif
90   int nrecs;
91   RecordInfo *recList;
92   FieldVector2D &vars;
93 
ReadArgumentsReadArguments94   ReadArguments(FieldVector2D &input_vars) : tsIDnext(0), nrecs(0), recList(nullptr), vars(input_vars){};
95 };
96 
97 static int num_recs = 0;
98 
99 static void *
cdoReadTimestep(void * rarg)100 cdoReadTimestep(void *rarg)
101 {
102   ReadArguments *readarg = (ReadArguments *) rarg;
103   auto &input_vars = readarg->vars;
104   RecordInfo *recList = readarg->recList;
105   const auto streamID = readarg->streamID;
106   int tsIDnext = readarg->tsIDnext;
107   int nrecs = readarg->nrecs;
108 
109   // timer_start(timer_read);
110 
111   for (int recID = 0; recID < nrecs; ++recID)
112     {
113       int varID, levelID;
114 #ifdef USE_CDI_STREAM
115       streamInqRecord(streamID, &varID, &levelID);
116 #else
117       cdo_inq_record(streamID, &varID, &levelID);
118 #endif
119 
120       if (tsIDnext == 1 && recList)
121         {
122           recList[recID].varID = varID;
123           recList[recID].levelID = levelID;
124         }
125 
126       size_t nmiss;
127 
128 #ifdef USE_CDI_STREAM
129       if (Options::CDO_Memtype == MemType::Float)
130         streamReadRecordF(streamID, input_vars[varID][levelID].vec_f.data(), &nmiss);
131       else
132         streamReadRecord(streamID, input_vars[varID][levelID].vec_d.data(), &nmiss);
133 #else
134       if (Options::CDO_Memtype == MemType::Float)
135         cdo_read_record_f(streamID, input_vars[varID][levelID].vec_f.data(), &nmiss);
136       else
137         cdo_read_record(streamID, input_vars[varID][levelID].vec_d.data(), &nmiss);
138 #endif
139       input_vars[varID][levelID].nmiss = nmiss;
140     }
141 
142     // timer_stop(timer_read);
143 
144 #ifdef USE_CDI_STREAM
145   num_recs = streamInqTimestep(streamID, tsIDnext);
146 #else
147   num_recs = cdo_stream_inq_timestep(streamID, tsIDnext);
148 #endif
149 
150   return ((void *) &num_recs);
151 }
152 
153 static void
vlistSetFrequency(int vlistID,int comparelen)154 vlistSetFrequency(int vlistID, int comparelen)
155 {
156   const char *freq = nullptr;
157   // clang-format off
158   if      (comparelen == DAY_LEN)  freq = "day";
159   else if (comparelen == MON_LEN)  freq = "mon";
160   else if (comparelen == YEAR_LEN) freq = "year";
161   // clang-format on
162   if (freq) cdiDefAttTxt(vlistID, CDI_GLOBAL, "frequency", (int) strlen(freq), freq);
163 }
164 
165 static void
addOperators(void)166 addOperators(void)
167 {
168   // clang-format off
169   cdo_operator_add("xtimmin",    FieldFunc_Min,   DATE_LEN, nullptr);
170   cdo_operator_add("xtimmax",    FieldFunc_Max,   DATE_LEN, nullptr);
171   cdo_operator_add("xtimsum",    FieldFunc_Sum,   DATE_LEN, nullptr);
172   cdo_operator_add("xtimmean",   FieldFunc_Mean,  DATE_LEN, nullptr);
173   cdo_operator_add("xtimavg",    FieldFunc_Avg,   DATE_LEN, nullptr);
174   cdo_operator_add("xtimvar",    FieldFunc_Var,   DATE_LEN, nullptr);
175   cdo_operator_add("xtimvar1",   FieldFunc_Var1,  DATE_LEN, nullptr);
176   cdo_operator_add("xtimstd",    FieldFunc_Std,   DATE_LEN, nullptr);
177   cdo_operator_add("xtimstd1",   FieldFunc_Std1,  DATE_LEN, nullptr);
178   cdo_operator_add("xyearmin",   FieldFunc_Min,   YEAR_LEN, nullptr);
179   cdo_operator_add("xyearmax",   FieldFunc_Max,   YEAR_LEN, nullptr);
180   cdo_operator_add("xyearsum",   FieldFunc_Sum,   YEAR_LEN, nullptr);
181   cdo_operator_add("xyearmean",  FieldFunc_Mean,  YEAR_LEN, nullptr);
182   cdo_operator_add("xyearavg",   FieldFunc_Avg,   YEAR_LEN, nullptr);
183   cdo_operator_add("xyearvar",   FieldFunc_Var,   YEAR_LEN, nullptr);
184   cdo_operator_add("xyearvar1",  FieldFunc_Var1,  YEAR_LEN, nullptr);
185   cdo_operator_add("xyearstd",   FieldFunc_Std,   YEAR_LEN, nullptr);
186   cdo_operator_add("xyearstd1",  FieldFunc_Std1,  YEAR_LEN, nullptr);
187   cdo_operator_add("xmonmin",    FieldFunc_Min,   MON_LEN, nullptr);
188   cdo_operator_add("xmonmax",    FieldFunc_Max,   MON_LEN, nullptr);
189   cdo_operator_add("xmonsum",    FieldFunc_Sum,   MON_LEN, nullptr);
190   cdo_operator_add("xmonmean",   FieldFunc_Mean,  MON_LEN, nullptr);
191   cdo_operator_add("xmonavg",    FieldFunc_Avg,   MON_LEN, nullptr);
192   cdo_operator_add("xmonvar",    FieldFunc_Var,   MON_LEN, nullptr);
193   cdo_operator_add("xmonvar1",   FieldFunc_Var1,  MON_LEN, nullptr);
194   cdo_operator_add("xmonstd",    FieldFunc_Std,   MON_LEN, nullptr);
195   cdo_operator_add("xmonstd1",   FieldFunc_Std1,  MON_LEN, nullptr);
196   // clang-format on
197 }
198 
199 void *
XTimstat(void * process)200 XTimstat(void *process)
201 {
202   TimeStat timestat_date = TimeStat::MEAN;
203   int64_t vdate = 0, vdate0 = 0;
204   int vtime = 0, vtime0 = 0;
205   CdoStreamID streamID3 = CDO_STREAM_UNDEF;
206   int vlistID3, taxisID3 = -1;
207   bool lvfrac = false;
208   char indate1[DATE_LEN + 1], indate2[DATE_LEN + 1];
209   double vfrac = 1;
210 
211   cdo_initialize(process);
212 
213   addOperators();
214 
215   const auto operatorID = cdo_operator_id();
216   auto operfunc = cdo_operator_f1(operatorID);
217   const auto comparelen = cdo_operator_f2(operatorID);
218 
219   const auto lmean = (operfunc == FieldFunc_Mean || operfunc == FieldFunc_Avg);
220   const auto lstd = (operfunc == FieldFunc_Std || operfunc == FieldFunc_Std1);
221   auto lvarstd = (lstd || operfunc == FieldFunc_Var || operfunc == FieldFunc_Var1);
222   const auto lvars2 = lvarstd;
223   int divisor = (operfunc == FieldFunc_Std1 || operfunc == FieldFunc_Var1);
224 
225   auto field2_stdvar_func = lstd ? field2_std : field2_var;
226   auto fieldc_stdvar_func = lstd ? fieldc_std : fieldc_var;
227 
228   if (operfunc == FieldFunc_Mean)
229     {
230       const auto oargc = cdo_operator_argc();
231       if (oargc == 1)
232         {
233           lvfrac = true;
234           vfrac = parameter_to_double(cdo_operator_argv(0));
235           if (Options::cdoVerbose) cdo_print("Set vfrac to %g", vfrac);
236           if (vfrac < 0 || vfrac > 1) cdo_abort("vfrac out of range!");
237         }
238       else if (oargc > 1)
239         cdo_abort("Too many arguments!");
240     }
241   else
242     {
243       operator_check_argc(0);
244     }
245 
246   const int cmplen = DATE_LEN - comparelen;
247 
248 #ifdef USE_CDI_STREAM
249   const auto streamID1 = streamOpenRead(cdo_get_stream_name(0));
250   auto vlistID1 = streamInqVlist(streamID1);
251 #else
252   const auto streamID1 = cdo_open_read(0);
253   auto vlistID1 = cdo_stream_inq_vlist(streamID1);
254 #endif
255 
256   const auto vlistID2 = vlistDuplicate(vlistID1);
257   vlist_define_timestep_type(vlistID2, operfunc);
258 
259   if (cmplen == 0) vlistDefNtsteps(vlistID2, 1);
260 
261   const auto taxisID1 = vlistInqTaxis(vlistID1);
262   const auto taxisID2 = taxisDuplicate(taxisID1);
263   taxisWithBounds(taxisID2);
264   if (taxisInqType(taxisID2) == TAXIS_FORECAST) taxisDefType(taxisID2, TAXIS_RELATIVE);
265   vlistDefTaxis(vlistID2, taxisID2);
266 
267   const auto nvars = vlistNvars(vlistID1);
268 
269   vlistSetFrequency(vlistID2, comparelen);
270 
271   const auto streamID2 = cdo_open_write(1);
272   cdo_def_vlist(streamID2, vlistID2);
273 
274   if (Options::cdoDiag)
275     {
276       char filename[8192];
277       strcpy(filename, cdo_operator_name(operatorID));
278       strcat(filename, "_");
279       strcat(filename, cdo_get_stream_name(1));
280       streamID3 = cdo_open_write(filename);
281 
282       vlistID3 = vlistDuplicate(vlistID1);
283 
284       for (int varID = 0; varID < nvars; ++varID)
285         {
286           vlistDefVarDatatype(vlistID3, varID, CDI_DATATYPE_INT32);
287           vlistDefVarMissval(vlistID3, varID, -1);
288           cdiDefKeyString(vlistID3, varID, CDI_KEY_UNITS, "");
289           vlistDefVarAddoffset(vlistID3, varID, 0);
290           vlistDefVarScalefactor(vlistID3, varID, 1);
291         }
292 
293       taxisID3 = taxisDuplicate(taxisID1);
294       taxisWithBounds(taxisID3);
295       vlistDefTaxis(vlistID3, taxisID3);
296 
297       cdo_def_vlist(streamID3, vlistID3);
298     }
299 
300   DateTimeList dtlist;
301   dtlist.set_stat(timestat_date);
302   dtlist.set_calendar(taxisInqCalendar(taxisID1));
303 
304   int FIELD_MEMTYPE = 0;
305   if (operfunc == FieldFunc_Mean && Options::CDO_Memtype == MemType::Float) FIELD_MEMTYPE = FIELD_FLT;
306   FieldVector3D input_vars(2);
307   fields_from_vlist(vlistID1, input_vars[0], FIELD_VEC | FIELD_MEMTYPE);
308   fields_from_vlist(vlistID1, input_vars[1], FIELD_VEC | FIELD_MEMTYPE);
309   FieldVector2D samp1, vars1, vars2;
310   fields_from_vlist(vlistID1, samp1);
311   fields_from_vlist(vlistID1, vars1, FIELD_VEC);
312   if (lvars2) fields_from_vlist(vlistID1, vars2, FIELD_VEC);
313 
314   int curFirst = 0;
315   int curSecond = 1;
316   (void) curSecond;
317 
318   ReadArguments readarg(input_vars[curFirst]);
319   readarg.streamID = streamID1;
320 
321   bool lparallelread = Options::CDO_Parallel_Read > 0;
322   bool ltsfirst = true;
323   cdo::Task *read_task = nullptr;
324   void *readresult = nullptr;
325 
326   if (lparallelread) read_task = new cdo::Task;
327 
328   int tsID = 0;
329   int otsID = 0;
330 #ifdef USE_CDI_STREAM
331   auto nrecs = streamInqTimestep(streamID1, tsID);
332 #else
333   auto nrecs = cdo_stream_inq_timestep(streamID1, tsID);
334 #endif
335   int maxrecs = nrecs;
336   std::vector<RecordInfo> recList(maxrecs);
337 
338   tsID++;
339   while (true)
340     {
341       int nsets = 0;
342       while (nrecs > 0)
343         {
344           dtlist.taxis_inq_timestep(taxisID1, nsets);
345           vdate = dtlist.get_vdate(nsets);
346           vtime = dtlist.get_vtime(nsets);
347 
348           if (nsets == 0) SET_DATE(indate2, vdate, vtime);
349           SET_DATE(indate1, vdate, vtime);
350 
351           if (DATE_IS_NEQ(indate1, indate2, cmplen)) break;
352 
353           readarg.tsIDnext = tsID;
354           readarg.nrecs = nrecs;
355           readarg.recList = recList.data();
356           readarg.vars = input_vars[curFirst];
357 
358           if (ltsfirst || !lparallelread)
359             {
360               ltsfirst = false;
361               readresult = cdoReadTimestep(&readarg);
362             }
363           else
364             {
365               readresult = read_task->wait();
366             }
367 
368           nrecs = *(int *) readresult;
369 
370           // std::swap(curFirst, curSecond);
371 
372           if (nrecs && lparallelread)
373             {
374               readarg.vars = input_vars[curFirst];
375               readarg.tsIDnext = tsID + 1;
376               read_task->start(cdoReadTimestep, &readarg);
377             }
378 
379           if (nsets == 0)
380             {
381 #ifdef _OPENMP
382 #pragma omp parallel for default(none) shared(maxrecs, recList, curFirst, input_vars, vars1, samp1) if (maxrecs > 1)
383 #endif
384               for (int recID = 0; recID < maxrecs; recID++)
385                 {
386                   const auto varID = recList[recID].varID;
387                   const auto levelID = recList[recID].levelID;
388 
389                   auto &rsamp1 = samp1[varID][levelID];
390                   auto &rvars1 = vars1[varID][levelID];
391                   auto &rinput_var = input_vars[curFirst][varID][levelID];
392 
393                   const auto nmiss = rinput_var.nmiss;
394 
395                   field_copy(rinput_var, rvars1);
396                   rvars1.nmiss = nmiss;
397                   if (nmiss || !rsamp1.empty())
398                     {
399                       const auto fieldsize = rvars1.size;
400                       if (rsamp1.empty()) rsamp1.resize(fieldsize);
401 
402                       for (size_t i = 0; i < fieldsize; i++) rsamp1.vec_d[i] = !DBL_IS_EQUAL(rvars1.vec_d[i], rvars1.missval);
403                     }
404                 }
405             }
406           else
407             {
408 #ifdef _OPENMP
409 #pragma omp parallel for default(none) \
410     shared(lvarstd, nsets, maxrecs, recList, curFirst, input_vars, vars1, samp1, vars2, operfunc) if (maxrecs > 1)
411 #endif
412               for (int recID = 0; recID < maxrecs; recID++)
413                 {
414                   const auto varID = recList[recID].varID;
415                   const auto levelID = recList[recID].levelID;
416 
417                   auto &rsamp1 = samp1[varID][levelID];
418                   auto &rvars1 = vars1[varID][levelID];
419                   auto &rinput_var = input_vars[curFirst][varID][levelID];
420 
421                   const auto nmiss = rinput_var.nmiss;
422 
423                   if (nmiss || !rsamp1.empty())
424                     {
425                       const auto fieldsize = rvars1.size;
426                       if (rsamp1.empty()) rsamp1.resize(fieldsize, nsets);
427 
428                       for (size_t i = 0; i < fieldsize; i++)
429                         if (!DBL_IS_EQUAL(rinput_var.vec_d[i], rvars1.missval)) rsamp1.vec_d[i]++;
430                     }
431 
432                   // clang-format off
433                   if (lvarstd) field2_sumsumq(rvars1, vars2[varID][levelID], rinput_var);
434                   else         field2_function(rvars1, rinput_var, operfunc);
435                   // clang-format on
436                 }
437             }
438 
439           if (nsets == 0 && lvarstd)
440             for (int recID = 0; recID < maxrecs; recID++)
441               {
442                 const auto varID = recList[recID].varID;
443                 const auto levelID = recList[recID].levelID;
444 
445                 if (vlistInqVarTimetype(vlistID1, varID) == TIME_CONSTANT) continue;
446 
447                 field2_moq(vars2[varID][levelID], vars1[varID][levelID]);
448               }
449 
450           vdate0 = vdate;
451           vtime0 = vtime;
452           nsets++;
453           tsID++;
454         }
455 
456       if (nrecs == 0 && nsets == 0) break;
457 
458       if (lmean)
459         {
460 #ifdef _OPENMP
461 #pragma omp parallel for default(none) shared(vlistID1, nsets, maxrecs, recList, vars1, samp1) if (maxrecs > 1)
462 #endif
463           for (int recID = 0; recID < maxrecs; recID++)
464             {
465               const auto varID = recList[recID].varID;
466               const auto levelID = recList[recID].levelID;
467 
468               if (vlistInqVarTimetype(vlistID1, varID) == TIME_CONSTANT) continue;
469 
470               if (!samp1[varID][levelID].empty())
471                 field2_div(vars1[varID][levelID], samp1[varID][levelID]);
472               else
473                 fieldc_div(vars1[varID][levelID], (double) nsets);
474             }
475         }
476       else if (lvarstd)
477         {
478           for (int recID = 0; recID < maxrecs; recID++)
479             {
480               const auto varID = recList[recID].varID;
481               const auto levelID = recList[recID].levelID;
482 
483               if (vlistInqVarTimetype(vlistID1, varID) == TIME_CONSTANT) continue;
484 
485               const auto &rsamp1 = samp1[varID][levelID];
486               auto &rvars1 = vars1[varID][levelID];
487               const auto &rvars2 = vars2[varID][levelID];
488 
489               if (!rsamp1.empty())
490                 field2_stdvar_func(rvars1, rvars2, rsamp1, divisor);
491               else
492                 fieldc_stdvar_func(rvars1, rvars2, nsets, divisor);
493             }
494         }
495 
496       if (Options::cdoVerbose)
497         cdo_print("%s %s  vfrac = %g, nsets = %d", date_to_string(vdate0).c_str(), time_to_string(vtime0).c_str(), vfrac, nsets);
498 
499       if (lvfrac && operfunc == FieldFunc_Mean)
500         for (int recID = 0; recID < maxrecs; recID++)
501           {
502             const auto varID = recList[recID].varID;
503             const auto levelID = recList[recID].levelID;
504             auto &rvars1 = vars1[varID][levelID];
505 
506             if (vlistInqVarTimetype(vlistID1, varID) == TIME_CONSTANT) continue;
507 
508             const auto missval = rvars1.missval;
509             if (!samp1[varID][levelID].empty())
510               {
511                 const auto fieldsize = rvars1.size;
512                 size_t irun = 0;
513                 for (size_t i = 0; i < fieldsize; ++i)
514                   {
515                     if ((samp1[varID][levelID].vec_d[i] / nsets) < vfrac)
516                       {
517                         rvars1.vec_d[i] = missval;
518                         irun++;
519                       }
520                   }
521 
522                 if (irun) rvars1.nmiss = field_num_miss(rvars1);
523               }
524           }
525 
526       dtlist.stat_taxis_def_timestep(taxisID2, nsets);
527       cdo_def_timestep(streamID2, otsID);
528 
529       if (Options::cdoDiag)
530         {
531           dtlist.stat_taxis_def_timestep(taxisID3, nsets);
532           cdo_def_timestep(streamID3, otsID);
533         }
534 
535       for (int recID = 0; recID < maxrecs; recID++)
536         {
537           const auto varID = recList[recID].varID;
538           const auto levelID = recList[recID].levelID;
539           auto &rvars1 = vars1[varID][levelID];
540 
541           if (otsID && vlistInqVarTimetype(vlistID1, varID) == TIME_CONSTANT) continue;
542 
543           cdo_def_record(streamID2, varID, levelID);
544           cdo_write_record(streamID2, rvars1.vec_d.data(), rvars1.nmiss);
545 
546           if (Options::cdoDiag)
547             {
548               if (!samp1[varID][levelID].empty())
549                 {
550                   cdo_def_record(streamID3, varID, levelID);
551                   cdo_write_record(streamID3, samp1[varID][levelID].vec_d.data(), 0);
552                 }
553             }
554         }
555 
556       if (nrecs == 0) break;
557       otsID++;
558     }
559 
560   if (Options::cdoDiag) cdo_stream_close(streamID3);
561   cdo_stream_close(streamID2);
562 #ifdef USE_CDI_STREAM
563   streamClose(streamID1);
564 #else
565   cdo_stream_close(streamID1);
566 #endif
567 
568   if (read_task) delete read_task;
569 
570   cdo_finish();
571 
572   return nullptr;
573 }
574