1 /*
2 This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3
4 Author: Uwe Schulzweida
5
6 */
7
8 #ifdef HAVE_CONFIG_H
9 #include "config.h" /* HAVE_NC4HDF5_THREADSAFE */
10 #endif
11
12 #include <sys/stat.h> /* stat */
13 #include <cdi.h>
14
15 #include "fileStream.h"
16
17 #include "cdi_lockedIO.h"
18 #include "cdo_options.h"
19 #include "cdo_output.h"
20 #include "cdo_default_values.h"
21 #include "cdo_history.h"
22 #include "process.h"
23
24 #include "timer.h"
25 #include "commandline.h"
26
27 bool FileStream::TimerEnabled = false;
28 #ifndef HAVE_NC4HDF5_THREADSAFE
29 static auto inputFileTypeIsNetCDF4 = false;
30 #endif
31
FileStream(const std::string & p_fileName)32 FileStream::FileStream(const std::string &p_fileName) : m_filename(p_fileName)
33 {
34 m_name = p_fileName;
35 m_fileID = CDI_UNDEFID;
36 }
37
38 int
open_read()39 FileStream::open_read()
40 {
41 open_lock();
42
43 const auto fileID = streamOpenRead(m_filename.c_str());
44 if (fileID < 0) cdi_open_error(fileID, "Open failed on >%s<", m_filename.c_str());
45 isopen = true;
46
47 m_filetype = streamInqFiletype(fileID);
48 if (CdoDefault::FileType == CDI_UNDEFID) CdoDefault::FileType = m_filetype;
49 m_fileID = fileID;
50
51 #ifndef HAVE_NC4HDF5_THREADSAFE
52 if (m_filetype == CDI_FILETYPE_NC4 || m_filetype == CDI_FILETYPE_NC4C) inputFileTypeIsNetCDF4 = true;
53 #endif
54
55 Debug(FILE_STREAM, "Set number of worker to %d", Options::numStreamWorker);
56 if (Options::numStreamWorker > 0) streamDefNumWorker(fileID, Options::numStreamWorker);
57
58 open_unlock();
59
60 return fileID;
61 }
62
63 int
open_write(int p_filetype)64 FileStream::open_write(int p_filetype)
65 {
66 if (Options::cdoInteractive)
67 {
68 struct stat stbuf;
69 const auto rstatus = stat(m_name.c_str(), &stbuf);
70 // If permanent file already exists, query user whether to overwrite or exit
71 if (rstatus != -1) query_user_exit(m_name.c_str());
72 }
73 if (p_filetype == CDI_UNDEFID) p_filetype = CDI_FILETYPE_GRB;
74
75 #ifndef HAVE_NC4HDF5_THREADSAFE
76 const auto outputFileTypeIsNetCDF4 = (p_filetype == CDI_FILETYPE_NC4 || p_filetype == CDI_FILETYPE_NC4C);
77 if (inputFileTypeIsNetCDF4 && outputFileTypeIsNetCDF4 && get_process_num() > 1 && Threading::cdoLockIO == false)
78 {
79 cdo_warning("Using a non-thread-safe NetCDF4/HDF5 library in a multi-threaded environment may lead to erroneous results!");
80 cdo_warning("Use a thread-safe NetCDF4/HDF5 library or the CDO option -L to avoid such errors.");
81 }
82 #endif
83
84 // TODO FIX THIS: if (FileStream::timersEnabled()) timer_start(timer_write);
85
86 open_lock();
87 const auto fileID = streamOpenWrite(m_filename.c_str(), p_filetype);
88 open_unlock();
89
90 // TODO FIX THIS: if(FileStream::timersEnabled()) timer_stop(timer_write);
91 if (fileID < 0) cdi_open_error(fileID, "Open failed on >%s<", m_name.c_str());
92 isopen = true;
93
94 if (CdoDefault::Byteorder != CDI_UNDEFID) streamDefByteorder(fileID, CdoDefault::Byteorder);
95
96 set_compression(fileID, p_filetype);
97
98 m_fileID = fileID;
99 m_filetype = p_filetype;
100
101 return m_cdoStreamID;
102 }
103
104 int
open_append()105 FileStream::open_append()
106 {
107 if (FileStream::timersEnabled()) timer_start(timer_write);
108
109 open_lock();
110 const auto fileID = streamOpenAppend(m_filename.c_str());
111 open_unlock();
112
113 if (FileStream::timersEnabled()) timer_stop(timer_write);
114
115 if (fileID < 0) cdi_open_error(fileID, "Open failed on >%s<", m_filename.c_str());
116
117 isopen = true;
118
119 m_filetype = streamInqFiletype(fileID);
120 set_compression(fileID, m_filetype);
121
122 m_fileID = fileID;
123
124 return m_fileID;
125 }
126
127 void
def_vlist(const int p_vlistID)128 FileStream::def_vlist(const int p_vlistID)
129 {
130 cdo_append_history(p_vlistID, command_line());
131
132 if (CdoDefault::DataType != CDI_UNDEFID)
133 {
134 const auto nvars = vlistNvars(p_vlistID);
135 for (int varID = 0; varID < nvars; ++varID) vlistDefVarDatatype(p_vlistID, varID, CdoDefault::DataType);
136
137 if (CdoDefault::DataType == CDI_DATATYPE_FLT64 || CdoDefault::DataType == CDI_DATATYPE_FLT32)
138 {
139 for (int varID = 0; varID < nvars; varID++)
140 {
141 vlistDefVarAddoffset(p_vlistID, varID, 0.0);
142 vlistDefVarScalefactor(p_vlistID, varID, 1.0);
143 }
144 }
145 }
146
147 if (Options::cdoChunkType != CDI_UNDEFID)
148 {
149 const auto nvars = vlistNvars(p_vlistID);
150 for (int varID = 0; varID < nvars; ++varID) vlistDefVarChunkType(p_vlistID, varID, Options::cdoChunkType);
151 }
152
153 if (Options::CMOR_Mode)
154 {
155 cdo_def_tracking_id(p_vlistID, "tracking_id");
156 cdo_def_creation_date(p_vlistID);
157 }
158
159 if (Options::VersionInfo) cdiDefAttTxt(p_vlistID, CDI_GLOBAL, "CDO", (int) strlen(cdo_comment()), cdo_comment());
160
161 #ifdef _OPENMP
162 if (Threading::ompNumThreads > 1)
163 cdiDefAttInt(p_vlistID, CDI_GLOBAL, "cdo_openmp_thread_number", CDI_DATATYPE_INT32, 1, &Threading::ompNumThreads);
164 #endif
165 defDatarangeList(p_vlistID);
166
167 if (FileStream::timersEnabled()) timer_start(timer_write);
168 stream_def_vlist_locked(m_fileID, p_vlistID);
169 if (FileStream::timersEnabled()) timer_stop(timer_write);
170 }
171
172 int
inq_vlist()173 FileStream::inq_vlist()
174 {
175 if (FileStream::timersEnabled()) timer_start(timer_read);
176 const auto vlistID = stream_inq_vlist_locked(m_fileID);
177 if (FileStream::timersEnabled()) timer_stop(timer_read);
178 if (vlistID == -1) cdo_abort("Couldn't read data from input fileID %d!", m_fileID);
179
180 const auto nsubtypes = vlistNsubtypes(vlistID);
181 if (nsubtypes > 1) cdo_warning("Subtypes are unsupported, the processing results are possibly wrong!");
182
183 if (CdoDefault::TaxisType != CDI_UNDEFID) taxisDefType(vlistInqTaxis(vlistID), CdoDefault::TaxisType);
184
185 m_vlistID = vlistID;
186 return m_vlistID;
187 }
188
189 void
inq_record(int * const varID,int * const levelID)190 FileStream::inq_record(int *const varID, int *const levelID)
191 {
192 if (FileStream::timersEnabled()) timer_start(timer_read);
193 stream_inq_rec_locked(m_fileID, varID, levelID);
194 if (FileStream::timersEnabled()) timer_stop(timer_read);
195 m_varID = *varID;
196 }
197
198 void
defRecord(const int varID,const int levelID)199 FileStream::defRecord(const int varID, const int levelID)
200 {
201 if (FileStream::timersEnabled()) timer_start(timer_write);
202 stream_def_rec_locked(m_fileID, varID, levelID);
203 if (FileStream::timersEnabled()) timer_stop(timer_write);
204 m_varID = varID;
205 }
206
207 void
read_record(float * const p_data,size_t * const nmiss)208 FileStream::read_record(float *const p_data, size_t *const nmiss)
209 {
210 if (FileStream::timersEnabled()) timer_start(timer_read);
211 stream_readrecord_float_locked(m_fileID, p_data, nmiss);
212 if (FileStream::timersEnabled()) timer_stop(timer_read);
213 }
214
215 void
read_record(double * const p_data,size_t * const nmiss)216 FileStream::read_record(double *const p_data, size_t *const nmiss)
217 {
218 if (FileStream::timersEnabled()) timer_start(timer_read);
219 stream_readrecord_double_locked(m_fileID, p_data, nmiss);
220 if (FileStream::timersEnabled()) timer_stop(timer_read);
221 }
222
223 void
read_record(Field * const p_field,size_t * const nmiss)224 FileStream::read_record(Field *const p_field, size_t *const nmiss)
225 {
226 read_record(p_field->vec_d.data(), nmiss);
227 }
228
229 void
write_record(float * const p_data,const size_t p_nmiss)230 FileStream::write_record(float *const p_data, const size_t p_nmiss)
231 {
232 if (FileStream::timersEnabled()) timer_start(timer_write);
233
234 const auto varID = m_varID;
235 if (varID < (int) m_datarangelist.size())
236 if (m_datarangelist[varID].check_datarange) m_datarangelist[varID].checkDatarange(p_data, p_nmiss);
237
238 stream_write_record_float_locked(m_fileID, p_data, p_nmiss);
239
240 if (FileStream::timersEnabled()) timer_stop(timer_write);
241 }
242
243 void
write_record(double * const p_data,const size_t p_nmiss)244 FileStream::write_record(double *const p_data, const size_t p_nmiss)
245 {
246 if (FileStream::timersEnabled()) timer_start(timer_write);
247
248 const auto varID = m_varID;
249 if (varID < (int) m_datarangelist.size())
250 if (m_datarangelist[varID].check_datarange) m_datarangelist[varID].checkDatarange(p_data, p_nmiss);
251
252 stream_write_record_double_locked(m_fileID, p_data, p_nmiss);
253
254 if (FileStream::timersEnabled()) timer_stop(timer_write);
255 }
256
257 void
write_record(Field * const p_field,const size_t p_nmiss)258 FileStream::write_record(Field *const p_field, const size_t p_nmiss)
259 {
260 write_record(p_field->vec_d.data(), p_nmiss);
261 }
262
263 void
copyRecord(CdoStreamID p_destination)264 FileStream::copyRecord(CdoStreamID p_destination)
265 {
266 FileStream *fStream = dynamic_cast<FileStream *>(p_destination.get());
267 stream_copy_record_locked(m_fileID, fStream->getFileID());
268 }
269 /*
270 * FileStream::inq_timestep(int p_tsID)
271 * stets internal state of the cdi datastructure to work on the given timestep (p_tsID) and returns the number of records that the
272 * timestep contains.
273 * Inquires and defines the time axis type if the timestep ID is 0 AND the taxis type is yet to be defined.
274 * When the timestep inquiry was successfull m_tsID is set to the wanted p_tsID IF p_tsID != m_tsID
275 * When only one process is running the timers are enabled.
276 * -- last Documentation update(2019-06-14) --
277 */
278 int
inq_timestep(const int p_tsID)279 FileStream::inq_timestep(const int p_tsID)
280 {
281 if (FileStream::timersEnabled()) timer_start(timer_read);
282 const auto nrecs = stream_inq_time_step_locked(m_fileID, p_tsID);
283 if (FileStream::timersEnabled()) timer_stop(timer_read);
284
285 if (p_tsID == 0 && CdoDefault::TaxisType != CDI_UNDEFID) taxisDefType(vlistInqTaxis(m_vlistID), CdoDefault::TaxisType);
286
287 if (nrecs && p_tsID != m_tsID) m_tsID = p_tsID;
288 Debug(FILE_STREAM, "Current TsID: %d, nrecs: %d", m_tsID, nrecs);
289
290 return nrecs;
291 }
292
293 void
def_timestep(const int p_tsID)294 FileStream::def_timestep(const int p_tsID)
295 {
296 if (FileStream::timersEnabled()) timer_start(timer_write);
297 // don't use sync -> very slow on GPFS
298 // if ( p_tsID > 0 ) streamSync(fileID);
299
300 stream_def_time_step_locked(m_fileID, p_tsID);
301
302 if (FileStream::timersEnabled()) timer_stop(timer_write);
303 }
304
305 int
inqFileType()306 FileStream::inqFileType()
307 {
308 return streamInqFiletype(m_fileID);
309 }
310
311 int
inqByteorder()312 FileStream::inqByteorder()
313 {
314 return streamInqByteorder(m_fileID);
315 }
316
317 size_t
getNvals()318 FileStream::getNvals()
319 {
320 // set when the stream is closed
321 // see: FileStream::close()
322 return m_nvals;
323 }
324
325 int
getFileID()326 FileStream::getFileID()
327 {
328 return m_fileID;
329 }
330
331 void
close()332 FileStream::close()
333 {
334 Debug(FILE_STREAM, "%s fileID %d", m_name, m_fileID);
335
336 m_nvals = streamNvals(m_fileID);
337 stream_close_locked(m_fileID);
338
339 isopen = false;
340 m_vlistID = -1;
341
342 if (m_datarangelist.size())
343 {
344 m_datarangelist.clear();
345 m_datarangelist.shrink_to_fit();
346 }
347 }
348