1 /*
2   This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3 
4   Author: Uwe Schulzweida
5 
6 */
7 
8 #ifdef HAVE_CONFIG_H
9 #include "config.h" /* HAVE_NC4HDF5_THREADSAFE */
10 #endif
11 
12 #include <sys/stat.h> /* stat */
13 #include <cdi.h>
14 
15 #include "fileStream.h"
16 
17 #include "cdi_lockedIO.h"
18 #include "cdo_options.h"
19 #include "cdo_output.h"
20 #include "cdo_default_values.h"
21 #include "cdo_history.h"
22 #include "process.h"
23 
24 #include "timer.h"
25 #include "commandline.h"
26 
27 bool FileStream::TimerEnabled = false;
28 #ifndef HAVE_NC4HDF5_THREADSAFE
29 static auto inputFileTypeIsNetCDF4 = false;
30 #endif
31 
FileStream(const std::string & p_fileName)32 FileStream::FileStream(const std::string &p_fileName) : m_filename(p_fileName)
33 {
34   m_name = p_fileName;
35   m_fileID = CDI_UNDEFID;
36 }
37 
38 int
open_read()39 FileStream::open_read()
40 {
41   open_lock();
42 
43   const auto fileID = streamOpenRead(m_filename.c_str());
44   if (fileID < 0) cdi_open_error(fileID, "Open failed on >%s<", m_filename.c_str());
45   isopen = true;
46 
47   m_filetype = streamInqFiletype(fileID);
48   if (CdoDefault::FileType == CDI_UNDEFID) CdoDefault::FileType = m_filetype;
49   m_fileID = fileID;
50 
51 #ifndef HAVE_NC4HDF5_THREADSAFE
52   if (m_filetype == CDI_FILETYPE_NC4 || m_filetype == CDI_FILETYPE_NC4C) inputFileTypeIsNetCDF4 = true;
53 #endif
54 
55   Debug(FILE_STREAM, "Set number of worker to %d", Options::numStreamWorker);
56   if (Options::numStreamWorker > 0) streamDefNumWorker(fileID, Options::numStreamWorker);
57 
58   open_unlock();
59 
60   return fileID;
61 }
62 
63 int
open_write(int p_filetype)64 FileStream::open_write(int p_filetype)
65 {
66   if (Options::cdoInteractive)
67     {
68       struct stat stbuf;
69       const auto rstatus = stat(m_name.c_str(), &stbuf);
70       // If permanent file already exists, query user whether to overwrite or exit
71       if (rstatus != -1) query_user_exit(m_name.c_str());
72     }
73   if (p_filetype == CDI_UNDEFID) p_filetype = CDI_FILETYPE_GRB;
74 
75 #ifndef HAVE_NC4HDF5_THREADSAFE
76   const auto outputFileTypeIsNetCDF4 = (p_filetype == CDI_FILETYPE_NC4 || p_filetype == CDI_FILETYPE_NC4C);
77   if (inputFileTypeIsNetCDF4 && outputFileTypeIsNetCDF4 && get_process_num() > 1 && Threading::cdoLockIO == false)
78     {
79       cdo_warning("Using a non-thread-safe NetCDF4/HDF5 library in a multi-threaded environment may lead to erroneous results!");
80       cdo_warning("Use a thread-safe NetCDF4/HDF5 library or the CDO option -L to avoid such errors.");
81     }
82 #endif
83 
84   // TODO FIX THIS: if (FileStream::timersEnabled()) timer_start(timer_write);
85 
86   open_lock();
87   const auto fileID = streamOpenWrite(m_filename.c_str(), p_filetype);
88   open_unlock();
89 
90   // TODO FIX THIS: if(FileStream::timersEnabled()) timer_stop(timer_write);
91   if (fileID < 0) cdi_open_error(fileID, "Open failed on >%s<", m_name.c_str());
92   isopen = true;
93 
94   if (CdoDefault::Byteorder != CDI_UNDEFID) streamDefByteorder(fileID, CdoDefault::Byteorder);
95 
96   set_compression(fileID, p_filetype);
97 
98   m_fileID = fileID;
99   m_filetype = p_filetype;
100 
101   return m_cdoStreamID;
102 }
103 
104 int
open_append()105 FileStream::open_append()
106 {
107   if (FileStream::timersEnabled()) timer_start(timer_write);
108 
109   open_lock();
110   const auto fileID = streamOpenAppend(m_filename.c_str());
111   open_unlock();
112 
113   if (FileStream::timersEnabled()) timer_stop(timer_write);
114 
115   if (fileID < 0) cdi_open_error(fileID, "Open failed on >%s<", m_filename.c_str());
116 
117   isopen = true;
118 
119   m_filetype = streamInqFiletype(fileID);
120   set_compression(fileID, m_filetype);
121 
122   m_fileID = fileID;
123 
124   return m_fileID;
125 }
126 
127 void
def_vlist(const int p_vlistID)128 FileStream::def_vlist(const int p_vlistID)
129 {
130   cdo_append_history(p_vlistID, command_line());
131 
132   if (CdoDefault::DataType != CDI_UNDEFID)
133     {
134       const auto nvars = vlistNvars(p_vlistID);
135       for (int varID = 0; varID < nvars; ++varID) vlistDefVarDatatype(p_vlistID, varID, CdoDefault::DataType);
136 
137       if (CdoDefault::DataType == CDI_DATATYPE_FLT64 || CdoDefault::DataType == CDI_DATATYPE_FLT32)
138         {
139           for (int varID = 0; varID < nvars; varID++)
140             {
141               vlistDefVarAddoffset(p_vlistID, varID, 0.0);
142               vlistDefVarScalefactor(p_vlistID, varID, 1.0);
143             }
144         }
145     }
146 
147   if (Options::cdoChunkType != CDI_UNDEFID)
148     {
149       const auto nvars = vlistNvars(p_vlistID);
150       for (int varID = 0; varID < nvars; ++varID) vlistDefVarChunkType(p_vlistID, varID, Options::cdoChunkType);
151     }
152 
153   if (Options::CMOR_Mode)
154     {
155       cdo_def_tracking_id(p_vlistID, "tracking_id");
156       cdo_def_creation_date(p_vlistID);
157     }
158 
159   if (Options::VersionInfo) cdiDefAttTxt(p_vlistID, CDI_GLOBAL, "CDO", (int) strlen(cdo_comment()), cdo_comment());
160 
161 #ifdef _OPENMP
162   if (Threading::ompNumThreads > 1)
163     cdiDefAttInt(p_vlistID, CDI_GLOBAL, "cdo_openmp_thread_number", CDI_DATATYPE_INT32, 1, &Threading::ompNumThreads);
164 #endif
165   defDatarangeList(p_vlistID);
166 
167   if (FileStream::timersEnabled()) timer_start(timer_write);
168   stream_def_vlist_locked(m_fileID, p_vlistID);
169   if (FileStream::timersEnabled()) timer_stop(timer_write);
170 }
171 
172 int
inq_vlist()173 FileStream::inq_vlist()
174 {
175   if (FileStream::timersEnabled()) timer_start(timer_read);
176   const auto vlistID = stream_inq_vlist_locked(m_fileID);
177   if (FileStream::timersEnabled()) timer_stop(timer_read);
178   if (vlistID == -1) cdo_abort("Couldn't read data from input fileID %d!", m_fileID);
179 
180   const auto nsubtypes = vlistNsubtypes(vlistID);
181   if (nsubtypes > 1) cdo_warning("Subtypes are unsupported, the processing results are possibly wrong!");
182 
183   if (CdoDefault::TaxisType != CDI_UNDEFID) taxisDefType(vlistInqTaxis(vlistID), CdoDefault::TaxisType);
184 
185   m_vlistID = vlistID;
186   return m_vlistID;
187 }
188 
189 void
inq_record(int * const varID,int * const levelID)190 FileStream::inq_record(int *const varID, int *const levelID)
191 {
192   if (FileStream::timersEnabled()) timer_start(timer_read);
193   stream_inq_rec_locked(m_fileID, varID, levelID);
194   if (FileStream::timersEnabled()) timer_stop(timer_read);
195   m_varID = *varID;
196 }
197 
198 void
defRecord(const int varID,const int levelID)199 FileStream::defRecord(const int varID, const int levelID)
200 {
201   if (FileStream::timersEnabled()) timer_start(timer_write);
202   stream_def_rec_locked(m_fileID, varID, levelID);
203   if (FileStream::timersEnabled()) timer_stop(timer_write);
204   m_varID = varID;
205 }
206 
207 void
read_record(float * const p_data,size_t * const nmiss)208 FileStream::read_record(float *const p_data, size_t *const nmiss)
209 {
210   if (FileStream::timersEnabled()) timer_start(timer_read);
211   stream_readrecord_float_locked(m_fileID, p_data, nmiss);
212   if (FileStream::timersEnabled()) timer_stop(timer_read);
213 }
214 
215 void
read_record(double * const p_data,size_t * const nmiss)216 FileStream::read_record(double *const p_data, size_t *const nmiss)
217 {
218   if (FileStream::timersEnabled()) timer_start(timer_read);
219   stream_readrecord_double_locked(m_fileID, p_data, nmiss);
220   if (FileStream::timersEnabled()) timer_stop(timer_read);
221 }
222 
223 void
read_record(Field * const p_field,size_t * const nmiss)224 FileStream::read_record(Field *const p_field, size_t *const nmiss)
225 {
226   read_record(p_field->vec_d.data(), nmiss);
227 }
228 
229 void
write_record(float * const p_data,const size_t p_nmiss)230 FileStream::write_record(float *const p_data, const size_t p_nmiss)
231 {
232   if (FileStream::timersEnabled()) timer_start(timer_write);
233 
234   const auto varID = m_varID;
235   if (varID < (int) m_datarangelist.size())
236     if (m_datarangelist[varID].check_datarange) m_datarangelist[varID].checkDatarange(p_data, p_nmiss);
237 
238   stream_write_record_float_locked(m_fileID, p_data, p_nmiss);
239 
240   if (FileStream::timersEnabled()) timer_stop(timer_write);
241 }
242 
243 void
write_record(double * const p_data,const size_t p_nmiss)244 FileStream::write_record(double *const p_data, const size_t p_nmiss)
245 {
246   if (FileStream::timersEnabled()) timer_start(timer_write);
247 
248   const auto varID = m_varID;
249   if (varID < (int) m_datarangelist.size())
250     if (m_datarangelist[varID].check_datarange) m_datarangelist[varID].checkDatarange(p_data, p_nmiss);
251 
252   stream_write_record_double_locked(m_fileID, p_data, p_nmiss);
253 
254   if (FileStream::timersEnabled()) timer_stop(timer_write);
255 }
256 
257 void
write_record(Field * const p_field,const size_t p_nmiss)258 FileStream::write_record(Field *const p_field, const size_t p_nmiss)
259 {
260   write_record(p_field->vec_d.data(), p_nmiss);
261 }
262 
263 void
copyRecord(CdoStreamID p_destination)264 FileStream::copyRecord(CdoStreamID p_destination)
265 {
266   FileStream *fStream = dynamic_cast<FileStream *>(p_destination.get());
267   stream_copy_record_locked(m_fileID, fStream->getFileID());
268 }
269 /*
270  * FileStream::inq_timestep(int p_tsID)
271  * stets internal state of the cdi datastructure to work on the given timestep (p_tsID) and returns the number of records that the
272  * timestep contains.
273  * Inquires and defines the time axis type if the timestep ID is 0 AND the taxis type is yet to be defined.
274  * When the timestep inquiry was successfull m_tsID is set to the wanted p_tsID IF p_tsID != m_tsID
275  * When only one process is running the timers are enabled.
276  * -- last Documentation update(2019-06-14) --
277  */
278 int
inq_timestep(const int p_tsID)279 FileStream::inq_timestep(const int p_tsID)
280 {
281   if (FileStream::timersEnabled()) timer_start(timer_read);
282   const auto nrecs = stream_inq_time_step_locked(m_fileID, p_tsID);
283   if (FileStream::timersEnabled()) timer_stop(timer_read);
284 
285   if (p_tsID == 0 && CdoDefault::TaxisType != CDI_UNDEFID) taxisDefType(vlistInqTaxis(m_vlistID), CdoDefault::TaxisType);
286 
287   if (nrecs && p_tsID != m_tsID) m_tsID = p_tsID;
288   Debug(FILE_STREAM, "Current TsID: %d,  nrecs: %d", m_tsID, nrecs);
289 
290   return nrecs;
291 }
292 
293 void
def_timestep(const int p_tsID)294 FileStream::def_timestep(const int p_tsID)
295 {
296   if (FileStream::timersEnabled()) timer_start(timer_write);
297   // don't use sync -> very slow on GPFS
298   //  if ( p_tsID > 0 ) streamSync(fileID);
299 
300   stream_def_time_step_locked(m_fileID, p_tsID);
301 
302   if (FileStream::timersEnabled()) timer_stop(timer_write);
303 }
304 
305 int
inqFileType()306 FileStream::inqFileType()
307 {
308   return streamInqFiletype(m_fileID);
309 }
310 
311 int
inqByteorder()312 FileStream::inqByteorder()
313 {
314   return streamInqByteorder(m_fileID);
315 }
316 
317 size_t
getNvals()318 FileStream::getNvals()
319 {
320   // set when the stream is closed
321   // see: FileStream::close()
322   return m_nvals;
323 }
324 
325 int
getFileID()326 FileStream::getFileID()
327 {
328   return m_fileID;
329 }
330 
331 void
close()332 FileStream::close()
333 {
334   Debug(FILE_STREAM, "%s fileID %d", m_name, m_fileID);
335 
336   m_nvals = streamNvals(m_fileID);
337   stream_close_locked(m_fileID);
338 
339   isopen = false;
340   m_vlistID = -1;
341 
342   if (m_datarangelist.size())
343     {
344       m_datarangelist.clear();
345       m_datarangelist.shrink_to_fit();
346     }
347 }
348