1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4
5 #include <limits.h>
6 #include <stdio.h>
7 #include <string.h>
8
9 #include "dmemory.h"
10
11 #include "cdi.h"
12 #include "cdi_int.h"
13 #include "stream_grb.h"
14 #include "stream_cdf.h"
15 #include "stream_srv.h"
16 #include "stream_ext.h"
17 #include "stream_ieg.h"
18
19
recordInitEntry(record_t * record)20 void recordInitEntry(record_t *record)
21 {
22 record->position = CDI_UNDEFID;
23 record->size = 0;
24 record->gridsize = 0;
25 record->param = 0;
26 record->ilevel = CDI_UNDEFID;
27 record->used = false;
28 record->tsteptype = CDI_UNDEFID;
29 record->varID = CDI_UNDEFID;
30 record->levelID = CDI_UNDEFID;
31 memset(record->varname, 0, sizeof(record->varname));
32 varScanKeysInit(&record->scanKeys);
33 memset(&record->tiles, 0, sizeof(record->tiles));
34 #ifdef HAVE_LIBFDB5
35 record->fdbItem = NULL;
36 #endif
37 }
38
39
recordNewEntry(stream_t * streamptr,int tsID)40 int recordNewEntry(stream_t *streamptr, int tsID)
41 {
42 size_t recordID = 0;
43 size_t recordSize = (size_t)streamptr->tsteps[tsID].recordSize;
44 record_t *records = streamptr->tsteps[tsID].records;
45 /*
46 Look for a free slot in record.
47 (Create the table the first time through).
48 */
49 if ( ! recordSize )
50 {
51 recordSize = 1; /* <<<<---- */
52 records = (record_t *) Malloc(recordSize * sizeof (record_t));
53
54 for ( size_t i = 0; i < recordSize; i++ )
55 records[i].used = CDI_UNDEFID;
56 }
57 else
58 {
59 while ( recordID < recordSize && records[recordID].used != CDI_UNDEFID )
60 ++recordID;
61 }
62 /*
63 If the table overflows, double its size.
64 */
65 if ( recordID == recordSize )
66 {
67 if (recordSize <= INT_MAX / 2)
68 recordSize *= 2;
69 else if (recordSize < INT_MAX)
70 recordSize = INT_MAX;
71 else
72 Error("Cannot handle this many records!\n");
73 records = (record_t *) Realloc(records, recordSize * sizeof (record_t));
74
75 for ( size_t i = recordID; i < recordSize; i++ )
76 records[i].used = CDI_UNDEFID;
77 }
78
79 recordInitEntry(&records[recordID]);
80
81 records[recordID].used = 1;
82
83 streamptr->tsteps[tsID].recordSize = (int)recordSize;
84 streamptr->tsteps[tsID].records = records;
85
86 return (int)recordID;
87 }
88
89 static
cdiInitRecord(stream_t * streamptr)90 void cdiInitRecord(stream_t *streamptr)
91 {
92 Record *record = (Record *) Malloc(sizeof(Record));
93 streamptr->record = record;
94
95 record->param = 0;
96 record->level = 0;
97 record->date = 0;
98 record->time = 0;
99 record->gridID = 0;
100 record->buffer = NULL;
101 record->buffersize = 0;
102 record->position = 0;
103 record->varID = 0;
104 record->levelID = CDI_UNDEFID;
105 }
106
107
streamInqRecord(int streamID,int * varID,int * levelID)108 void streamInqRecord(int streamID, int *varID, int *levelID)
109 {
110 check_parg(varID);
111 check_parg(levelID);
112
113 stream_t *streamptr = stream_to_pointer(streamID);
114
115 cdiDefAccesstype(streamID, TYPE_REC);
116
117 if ( ! streamptr->record ) cdiInitRecord(streamptr);
118
119 int tsID = streamptr->curTsID;
120 int rindex = streamptr->tsteps[tsID].curRecID + 1;
121
122 if ( rindex >= streamptr->tsteps[tsID].nrecs )
123 Error("record %d not available at timestep %d", rindex+1, tsID+1);
124
125 int recID = streamptr->tsteps[tsID].recIDs[rindex];
126
127 if ( recID == -1 || recID >= streamptr->tsteps[tsID].nallrecs )
128 Error("Internal problem! tsID = %d recID = %d", tsID, recID);
129
130 *varID = streamptr->tsteps[tsID].records[recID].varID;
131 int lindex = streamptr->tsteps[tsID].records[recID].levelID;
132
133 int isub = subtypeInqActiveIndex(streamptr->vars[*varID].subtypeID);
134 *levelID = streamptr->vars[*varID].recordTable[isub].lindex[lindex];
135
136 if ( CDI_Debug )
137 Message("streamID = %d tsID = %d, recID = %d, varID = %d, levelID = %d", streamID, tsID, recID, *varID, *levelID);
138
139 streamptr->curTsID = tsID;
140 streamptr->tsteps[tsID].curRecID = rindex;
141 }
142
143 /*
144 @Function streamDefRecord
145 @Title Define the next record
146
147 @Prototype void streamDefRecord(int streamID, int varID, int levelID)
148 @Parameter
149 @Item streamID Stream ID, from a previous call to @fref{streamOpenWrite}.
150 @Item varID Variable identifier.
151 @Item levelID Level identifier.
152
153 @Description
154 The function streamDefRecord defines the meta-data of the next record.
155 @EndFunction
156 */
streamDefRecord(int streamID,int varID,int levelID)157 void streamDefRecord(int streamID, int varID, int levelID)
158 {
159 stream_t *streamptr = stream_to_pointer(streamID);
160
161 int tsID = streamptr->curTsID;
162
163 if ( tsID == CDI_UNDEFID )
164 {
165 tsID++;
166 streamDefTimestep(streamID, tsID);
167 }
168
169 if ( ! streamptr->record ) cdiInitRecord(streamptr);
170
171 int vlistID = streamptr->vlistID;
172 int gridID = vlistInqVarGrid(vlistID, varID);
173 int zaxisID = vlistInqVarZaxis(vlistID, varID);
174 int param = vlistInqVarParam(vlistID, varID);
175 int level = (int)(zaxisInqLevel(zaxisID, levelID));
176
177 Record *record = streamptr->record;
178 record->varID = varID;
179 record->levelID = levelID;
180 record->param = param;
181 record->level = level;
182 record->date = streamptr->tsteps[tsID].taxis.vdate;
183 record->time = streamptr->tsteps[tsID].taxis.vtime;
184 record->gridID = gridID;
185 record->prec = vlistInqVarDatatype(vlistID, varID);
186
187 switch (cdiBaseFiletype(streamptr->filetype))
188 {
189 #ifdef HAVE_LIBGRIB
190 case CDI_FILETYPE_GRB:
191 case CDI_FILETYPE_GRB2:
192 grbDefRecord(streamptr);
193 break;
194 #endif
195 #ifdef HAVE_LIBSERVICE
196 case CDI_FILETYPE_SRV:
197 srvDefRecord(streamptr);
198 break;
199 #endif
200 #ifdef HAVE_LIBEXTRA
201 case CDI_FILETYPE_EXT:
202 extDefRecord(streamptr);
203 break;
204 #endif
205 #ifdef HAVE_LIBIEG
206 case CDI_FILETYPE_IEG:
207 iegDefRecord(streamptr);
208 break;
209 #endif
210 #ifdef HAVE_LIBNETCDF
211 case CDI_FILETYPE_NETCDF:
212 if ( streamptr->accessmode == 0 ) cdfEndDef(streamptr);
213 cdfDefRecord(streamptr);
214 break;
215 #endif
216 default:
217 Error("%s support not compiled in!", strfiletype(streamptr->filetype));
218 break;
219 }
220 }
221
222
streamCopyRecord(int streamID2,int streamID1)223 void streamCopyRecord(int streamID2, int streamID1)
224 {
225 stream_t *streamptr1 = stream_to_pointer(streamID1),
226 *streamptr2 = stream_to_pointer(streamID2);
227 int filetype1 = streamptr1->filetype,
228 filetype2 = streamptr2->filetype,
229 filetype = CDI_FILETYPE_UNDEF;
230
231 if (cdiBaseFiletype(filetype1) == cdiBaseFiletype(filetype2)) filetype = filetype2;
232
233 if ( filetype == CDI_FILETYPE_UNDEF )
234 Error("Streams have different file types (%s -> %s)!", strfiletype(filetype1), strfiletype(filetype2));
235
236 switch (cdiBaseFiletype(filetype))
237 {
238 #ifdef HAVE_LIBGRIB
239 case CDI_FILETYPE_GRB:
240 case CDI_FILETYPE_GRB2:
241 grbCopyRecord(streamptr2, streamptr1);
242 break;
243 #endif
244 #ifdef HAVE_LIBSERVICE
245 case CDI_FILETYPE_SRV:
246 srvCopyRecord(streamptr2, streamptr1);
247 break;
248 #endif
249 #ifdef HAVE_LIBEXTRA
250 case CDI_FILETYPE_EXT:
251 extCopyRecord(streamptr2, streamptr1);
252 break;
253 #endif
254 #ifdef HAVE_LIBIEG
255 case CDI_FILETYPE_IEG:
256 iegCopyRecord(streamptr2, streamptr1);
257 break;
258 #endif
259 #ifdef HAVE_LIBNETCDF
260 case CDI_FILETYPE_NETCDF:
261 cdfCopyRecord(streamptr2, streamptr1);
262 break;
263 #endif
264 default:
265 {
266 Error("%s support not compiled in!", strfiletype(filetype));
267 break;
268 }
269 }
270 }
271
272
cdi_create_records(stream_t * streamptr,int tsID)273 void cdi_create_records(stream_t *streamptr, int tsID)
274 {
275 unsigned nrecords, maxrecords;
276
277 tsteps_t *sourceTstep = streamptr->tsteps;
278 tsteps_t *destTstep = sourceTstep + tsID;
279
280 if ( destTstep->records ) return;
281
282 int vlistID = streamptr->vlistID;
283
284 if ( tsID == 0 )
285 {
286 maxrecords = 0;
287 int nvars = streamptr->nvars;
288 for ( int varID = 0; varID < nvars; varID++)
289 for (int isub=0; isub<streamptr->vars[varID].subtypeSize; isub++)
290 maxrecords += (unsigned)streamptr->vars[varID].recordTable[isub].nlevs;
291 }
292 else
293 {
294 maxrecords = (unsigned)sourceTstep->recordSize;
295 }
296
297 if ( tsID == 0 )
298 {
299 nrecords = maxrecords;
300 }
301 else if ( tsID == 1 )
302 {
303 nrecords = 0;
304 maxrecords = (unsigned)sourceTstep->recordSize;
305 for ( unsigned recID = 0; recID < maxrecords; recID++ )
306 {
307 int varID = sourceTstep->records[recID].varID;
308 nrecords += (varID == CDI_UNDEFID /* varID = CDI_UNDEFID for write mode !!! */
309 || vlistInqVarTimetype(vlistID, varID) != TIME_CONSTANT);
310 // printf("varID nrecords %d %d %d \n", varID, nrecords, vlistInqVarTsteptype(vlistID, varID));
311 }
312 }
313 else
314 {
315 nrecords = (unsigned)streamptr->tsteps[1].nallrecs;
316 }
317 // printf("tsID, nrecords %d %d\n", tsID, nrecords);
318
319 record_t *records = NULL;
320 if ( maxrecords > 0 ) records = (record_t *) Malloc(maxrecords*sizeof(record_t));
321
322 destTstep->records = records;
323 destTstep->recordSize = (int)maxrecords;
324 destTstep->nallrecs = (int)nrecords;
325
326 if ( tsID == 0 )
327 {
328 for ( unsigned recID = 0; recID < maxrecords; recID++ )
329 recordInitEntry(&destTstep->records[recID]);
330 }
331 else
332 {
333 memcpy(destTstep->records, sourceTstep->records, (size_t)maxrecords*sizeof(record_t));
334
335 for ( unsigned recID = 0; recID < maxrecords; recID++ )
336 {
337 record_t *curRecord = &sourceTstep->records[recID];
338 destTstep->records[recID].used = curRecord->used;
339 if ( curRecord->used != CDI_UNDEFID && curRecord->varID != -1 ) /* curRecord->varID = -1 for write mode !!! */
340 {
341 if ( vlistInqVarTimetype(vlistID, curRecord->varID) != TIME_CONSTANT )
342 {
343 destTstep->records[recID].position = CDI_UNDEFID;
344 destTstep->records[recID].size = 0;
345 destTstep->records[recID].used = false;
346 }
347 }
348 }
349 }
350 }
351
352 #include "file.h"
353
streamFCopyRecord(stream_t * streamptr2,stream_t * streamptr1,const char * container_name)354 void streamFCopyRecord(stream_t *streamptr2, stream_t *streamptr1, const char *container_name)
355 {
356 int fileID1 = streamptr1->fileID;
357 int fileID2 = streamptr2->fileID;
358
359 int tsID = streamptr1->curTsID;
360 int vrecID = streamptr1->tsteps[tsID].curRecID;
361 int recID = streamptr1->tsteps[tsID].recIDs[vrecID];
362 off_t recpos = streamptr1->tsteps[tsID].records[recID].position;
363 size_t recsize = streamptr1->tsteps[tsID].records[recID].size;
364
365 if (fileSetPos(fileID1, recpos, SEEK_SET) != 0)
366 Error("Cannot seek input file for %s record copy!", container_name);
367
368 char *buffer = (char *) Malloc(recsize);
369
370 if (fileRead(fileID1, buffer, recsize) != recsize)
371 Error("Failed to read record from %s file for copying!", container_name);
372
373 if (fileWrite(fileID2, buffer, recsize) != recsize)
374 Error("Failed to write record to %s file when copying!", container_name);
375
376 Free(buffer);
377 }
378 /*
379 * Local Variables:
380 * c-file-style: "Java"
381 * c-basic-offset: 2
382 * indent-tabs-mode: nil
383 * show-trailing-whitespace: t
384 * require-trailing-newline: t
385 * End:
386 */
387