1 /*
2   This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3 
4   Author: 2010 Ralf Mueller
5 
6 */
7 
8 /*
9    This module contains the following operators:
10 
11       Consectstep  consecsum  For each timestep, the current number of
12                               onsecutive timsteps is counted
13       Consectstep  consects   For each period of consecutive timesteps, only
14                               count its length + last contributing timesteps
15 
16    =============================================================================
17    Created:  04/08/2010 11:58:01 AM
18     Author:  Ralf Mueller (ram), ralf.mueller@mpimet.mpg.de
19    Company:  Max-Planck-Institute for Meteorology
20    =============================================================================
21 */
22 
23 #include <cdi.h>
24 
25 #include "process_int.h"
26 #include "cdo_vlist.h"
27 #include "param_conversion.h"
28 
29 enum
30 {
31   CONSECSUM,
32   CONSECTS
33 };
34 
35 #define SWITCHWARN "Hit default case! This should never happen (%s).\n"
36 
37 static void
selEndOfPeriod(Field & periods,const Field & history,const Field & current,int isLastTimestep)38 selEndOfPeriod(Field &periods, const Field &history, const Field &current, int isLastTimestep)
39 {
40   auto pmissval = periods.missval;
41   auto &parray = periods.vec_d;
42   auto hmissval = history.missval;
43   const auto &harray = history.vec_d;
44   auto cmissval = current.missval;
45   const auto &carray = current.vec_d;
46 
47   auto len = gridInqSize(periods.grid);
48   if (len != gridInqSize(current.grid) || (gridInqSize(current.grid) != gridInqSize(history.grid)))
49     cdo_abort("Fields have different gridsize (%s)", __func__);
50 
51   if (!isLastTimestep)
52     {
53       if (current.nmiss || history.nmiss)
54         {
55 #ifdef _OPENMP
56 #pragma omp parallel for default(shared)
57 #endif
58           for (size_t i = 0; i < len; i++)
59             {
60               if (!DBL_IS_EQUAL(harray[i], hmissval))
61                 {
62                   if (!DBL_IS_EQUAL(carray[i], cmissval))
63                     parray[i] = (DBL_IS_EQUAL(carray[i], 0.0) && IS_NOT_EQUAL(harray[i], 0.0)) ? harray[i] : pmissval;
64                   else  // DBL_IS_EQUAL(carray[i], cmissval)
65                     parray[i] = (IS_NOT_EQUAL(harray[i], 0.0)) ? harray[i] : pmissval;
66                 }
67               else /* DBL_IS_EQUAL(harray[i], hmissval) */
68                 {
69                   parray[i] = pmissval;
70                 }
71             }
72         }
73       else
74         {
75 #ifdef _OPENMP
76 #pragma omp parallel for default(shared)
77 #endif
78           for (size_t i = 0; i < len; i++)
79             parray[i] = (DBL_IS_EQUAL(carray[i], 0.0) && IS_NOT_EQUAL(harray[i], 0.0)) ? harray[i] : pmissval;
80         }
81     }
82   else
83     {
84       if (current.nmiss)
85         {
86 #ifdef _OPENMP
87 #pragma omp parallel for default(shared)
88 #endif
89           for (size_t i = 0; i < len; i++)
90             if (!DBL_IS_EQUAL(carray[i], cmissval))
91               parray[i] = (DBL_IS_EQUAL(carray[i], 0.0)) ? pmissval : carray[i];
92             else  // DBL_IS_EQUAL(carray[i], cmissval)
93               parray[i] = pmissval;
94         }
95       else
96         {
97 #ifdef _OPENMP
98 #pragma omp parallel for default(shared)
99 #endif
100           for (size_t i = 0; i < len; i++) parray[i] = DBL_IS_EQUAL(carray[i], 0.0) ? pmissval : carray[i];
101         }
102     }
103 
104   periods.nmiss = varray_num_mv(len, parray, pmissval);
105 }
106 
107 void *
Consecstat(void * process)108 Consecstat(void *process)
109 {
110   int64_t vdate = 0, histvdate = 0;
111   int vtime = 0, histvtime = 0;
112   double refval = 0.0;
113 
114   cdo_initialize(process);
115   cdo_operator_add("consecsum", CONSECSUM, 0, "refval");
116   cdo_operator_add("consects", CONSECTS, 0, nullptr);
117   const int operatorID = cdo_operator_id();
118 
119   if (operatorID == CONSECSUM)
120     if (cdo_operator_argc() > 0) refval = parameter_to_double(cdo_operator_argv(0));
121 
122   const auto istreamID = cdo_open_read(0);
123 
124   const auto ivlistID = cdo_stream_inq_vlist(istreamID);
125   const auto itaxisID = vlistInqTaxis(ivlistID);
126   const auto ovlistID = vlistDuplicate(ivlistID);
127   const auto otaxisID = taxisDuplicate(itaxisID);
128   vlistDefTaxis(ovlistID, otaxisID);
129 
130   VarList varList;
131   varListInit(varList, ovlistID);
132 
133   Field field;
134   field.resize(vlistGridsizeMax(ovlistID));
135 
136   const auto nvars = vlistNvars(ivlistID);
137   FieldVector2D vars, hist, periods;
138   fields_from_vlist(ivlistID, vars, FIELD_VEC, 0);
139   if (operatorID == CONSECTS)
140     {
141       fields_from_vlist(ivlistID, hist, FIELD_VEC);
142       fields_from_vlist(ivlistID, periods, FIELD_VEC);
143     }
144 
145   for (int varID = 0; varID < nvars; varID++) cdiDefKeyString(ovlistID, varID, CDI_KEY_UNITS, "steps");  // TODO
146 
147   const auto ostreamID = cdo_open_write(1);
148   cdo_def_vlist(ostreamID, ovlistID);
149 
150   int itsID = 0;
151   int otsID = 0;
152   while (true)
153     {
154       const auto nrecs = cdo_stream_inq_timestep(istreamID, itsID);
155       if (nrecs == 0) break;
156 
157       vdate = taxisInqVdate(itaxisID);
158       vtime = taxisInqVtime(itaxisID);
159       switch (operatorID)
160         {
161         case CONSECSUM:
162           taxisDefVdate(otaxisID, vdate);
163           taxisDefVtime(otaxisID, vtime);
164           cdo_def_timestep(ostreamID, otsID);
165           break;
166         case CONSECTS:
167           if (itsID != 0)
168             {
169               taxisDefVdate(otaxisID, histvdate);
170               taxisDefVtime(otaxisID, histvtime);
171               cdo_def_timestep(ostreamID, otsID - 1);
172             }
173           break;
174         default: printf(SWITCHWARN, __func__); break;
175         }
176 
177       for (int recID = 0; recID < nrecs; recID++)
178         {
179           int varID, levelID;
180           cdo_inq_record(istreamID, &varID, &levelID);
181           cdo_read_record(istreamID, field.vec_d.data(), &field.nmiss);
182           field.grid = varList[varID].gridID;
183           field.size = varList[varID].gridsize;
184           field.missval = varList[varID].missval;
185 
186           field2_sumtr(vars[varID][levelID], field, refval);
187 
188           switch (operatorID)
189             {
190             case CONSECSUM:
191               cdo_def_record(ostreamID, varID, levelID);
192               cdo_write_record(ostreamID, vars[varID][levelID].vec_d.data(), vars[varID][levelID].nmiss);
193               break;
194             case CONSECTS:
195               if (itsID != 0)
196                 {
197                   selEndOfPeriod(periods[varID][levelID], hist[varID][levelID], vars[varID][levelID], false);
198                   cdo_def_record(ostreamID, varID, levelID);
199                   cdo_write_record(ostreamID, periods[varID][levelID].vec_d.data(), periods[varID][levelID].nmiss);
200                 }
201 #ifdef _OPENMP
202 #pragma omp parallel for default(shared) schedule(static)
203               for (size_t i = 0; i < vars[varID][levelID].size; i++) hist[varID][levelID].vec_d[i] = vars[varID][levelID].vec_d[i];
204 #else
205               hist[varID][levelID].vec_d = vars[varID][levelID].vec_d;
206 #endif
207               break;
208             default: printf(SWITCHWARN, __func__); break;
209             }
210         }
211 
212       histvdate = vdate;
213       histvtime = vtime;
214       itsID++;
215       otsID++;
216     }
217 
218   if (operatorID == CONSECTS) /* Save the last timestep */
219     {
220       taxisDefVdate(otaxisID, vdate);
221       taxisDefVtime(otaxisID, vtime);
222       cdo_def_timestep(ostreamID, otsID - 1);
223 
224       for (int varID = 0; varID < nvars; varID++)
225         {
226           const auto nlevels = varList[varID].nlevels;
227           for (int levelID = 0; levelID < nlevels; levelID++)
228             {
229               selEndOfPeriod(periods[varID][levelID], hist[varID][levelID], vars[varID][levelID], true);
230               cdo_def_record(ostreamID, varID, levelID);
231               cdo_write_record(ostreamID, periods[varID][levelID].vec_d.data(), periods[varID][levelID].nmiss);
232             }
233         }
234     }
235 
236   cdo_stream_close(istreamID);
237   cdo_stream_close(ostreamID);
238 
239   cdo_finish();
240 
241   return nullptr;
242 }
243