1 #ifdef  HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4 
5 #include <limits.h>
6 #include <stdio.h>
7 #include <string.h>
8 
9 #include "dmemory.h"
10 
11 #include "cdi.h"
12 #include "cdi_int.h"
13 #include "stream_grb.h"
14 #include "stream_cdf.h"
15 #include "stream_srv.h"
16 #include "stream_ext.h"
17 #include "stream_ieg.h"
18 
19 
recordInitEntry(record_t * record)20 void recordInitEntry(record_t *record)
21 {
22   record->position  = CDI_UNDEFID;
23   record->size      = 0;
24   record->gridsize  = 0;
25   record->param     = 0;
26   record->ilevel    = CDI_UNDEFID;
27   record->used      = false;
28   record->tsteptype = CDI_UNDEFID;
29   record->varID     = CDI_UNDEFID;
30   record->levelID   = CDI_UNDEFID;
31   memset(record->varname, 0, sizeof(record->varname));
32   varScanKeysInit(&record->scanKeys);
33   memset(&record->tiles, 0, sizeof(record->tiles));
34 #ifdef HAVE_LIBFDB5
35   record->fdbItem   = NULL;
36 #endif
37 }
38 
39 
recordNewEntry(stream_t * streamptr,int tsID)40 int recordNewEntry(stream_t *streamptr, int tsID)
41 {
42   size_t recordID = 0;
43   size_t recordSize = (size_t)streamptr->tsteps[tsID].recordSize;
44   record_t *records = streamptr->tsteps[tsID].records;
45   /*
46     Look for a free slot in record.
47     (Create the table the first time through).
48   */
49   if ( ! recordSize )
50     {
51       recordSize = 1;   /*  <<<<----  */
52       records = (record_t *) Malloc(recordSize * sizeof (record_t));
53 
54       for ( size_t i = 0; i < recordSize; i++ )
55 	records[i].used = CDI_UNDEFID;
56     }
57   else
58     {
59       while ( recordID < recordSize && records[recordID].used != CDI_UNDEFID )
60         ++recordID;
61     }
62   /*
63     If the table overflows, double its size.
64   */
65   if ( recordID == recordSize )
66     {
67       if (recordSize <= INT_MAX / 2)
68         recordSize *= 2;
69       else if (recordSize < INT_MAX)
70         recordSize = INT_MAX;
71       else
72         Error("Cannot handle this many records!\n");
73       records = (record_t *) Realloc(records, recordSize * sizeof (record_t));
74 
75       for ( size_t i = recordID; i < recordSize; i++ )
76 	records[i].used = CDI_UNDEFID;
77     }
78 
79   recordInitEntry(&records[recordID]);
80 
81   records[recordID].used = 1;
82 
83   streamptr->tsteps[tsID].recordSize = (int)recordSize;
84   streamptr->tsteps[tsID].records    = records;
85 
86   return (int)recordID;
87 }
88 
89 static
cdiInitRecord(stream_t * streamptr)90 void cdiInitRecord(stream_t *streamptr)
91 {
92   Record *record = (Record *) Malloc(sizeof(Record));
93   streamptr->record = record;
94 
95   record->param      = 0;
96   record->level      = 0;
97   record->date       = 0;
98   record->time       = 0;
99   record->gridID     = 0;
100   record->buffer     = NULL;
101   record->buffersize = 0;
102   record->position   = 0;
103   record->varID      = 0;
104   record->levelID    = CDI_UNDEFID;
105 }
106 
107 
streamInqRecord(int streamID,int * varID,int * levelID)108 void streamInqRecord(int streamID, int *varID, int *levelID)
109 {
110   check_parg(varID);
111   check_parg(levelID);
112 
113   stream_t *streamptr = stream_to_pointer(streamID);
114 
115   cdiDefAccesstype(streamID, TYPE_REC);
116 
117   if ( ! streamptr->record ) cdiInitRecord(streamptr);
118 
119   int tsID   = streamptr->curTsID;
120   int rindex = streamptr->tsteps[tsID].curRecID + 1;
121 
122   if ( rindex >= streamptr->tsteps[tsID].nrecs )
123     Error("record %d not available at timestep %d", rindex+1, tsID+1);
124 
125   int recID  = streamptr->tsteps[tsID].recIDs[rindex];
126 
127   if ( recID == -1 || recID >= streamptr->tsteps[tsID].nallrecs )
128     Error("Internal problem! tsID = %d recID = %d", tsID, recID);
129 
130   *varID   = streamptr->tsteps[tsID].records[recID].varID;
131   int lindex = streamptr->tsteps[tsID].records[recID].levelID;
132 
133   int isub = subtypeInqActiveIndex(streamptr->vars[*varID].subtypeID);
134   *levelID = streamptr->vars[*varID].recordTable[isub].lindex[lindex];
135 
136   if ( CDI_Debug )
137     Message("streamID = %d tsID = %d, recID = %d, varID = %d, levelID = %d", streamID, tsID, recID, *varID, *levelID);
138 
139   streamptr->curTsID = tsID;
140   streamptr->tsteps[tsID].curRecID = rindex;
141 }
142 
143 /*
144 @Function  streamDefRecord
145 @Title     Define the next record
146 
147 @Prototype void streamDefRecord(int streamID, int varID, int levelID)
148 @Parameter
149     @Item  streamID  Stream ID, from a previous call to @fref{streamOpenWrite}.
150     @Item  varID     Variable identifier.
151     @Item  levelID   Level identifier.
152 
153 @Description
154 The function streamDefRecord defines the meta-data of the next record.
155 @EndFunction
156 */
streamDefRecord(int streamID,int varID,int levelID)157 void streamDefRecord(int streamID, int varID, int levelID)
158 {
159   stream_t *streamptr = stream_to_pointer(streamID);
160 
161   int tsID = streamptr->curTsID;
162 
163   if ( tsID == CDI_UNDEFID )
164     {
165       tsID++;
166       streamDefTimestep(streamID, tsID);
167     }
168 
169   if ( ! streamptr->record ) cdiInitRecord(streamptr);
170 
171   int vlistID = streamptr->vlistID;
172   int gridID  = vlistInqVarGrid(vlistID, varID);
173   int zaxisID = vlistInqVarZaxis(vlistID, varID);
174   int param   = vlistInqVarParam(vlistID, varID);
175   int level   = (int)(zaxisInqLevel(zaxisID, levelID));
176 
177   Record *record = streamptr->record;
178   record->varID    = varID;
179   record->levelID  = levelID;
180   record->param    = param;
181   record->level    = level;
182   record->date     = streamptr->tsteps[tsID].taxis.vdate;
183   record->time     = streamptr->tsteps[tsID].taxis.vtime;
184   record->gridID   = gridID;
185   record->prec     = vlistInqVarDatatype(vlistID, varID);
186 
187   switch (cdiBaseFiletype(streamptr->filetype))
188     {
189 #ifdef HAVE_LIBGRIB
190     case CDI_FILETYPE_GRB:
191     case CDI_FILETYPE_GRB2:
192       grbDefRecord(streamptr);
193       break;
194 #endif
195 #ifdef HAVE_LIBSERVICE
196     case CDI_FILETYPE_SRV:
197       srvDefRecord(streamptr);
198       break;
199 #endif
200 #ifdef HAVE_LIBEXTRA
201     case CDI_FILETYPE_EXT:
202       extDefRecord(streamptr);
203       break;
204 #endif
205 #ifdef HAVE_LIBIEG
206     case CDI_FILETYPE_IEG:
207       iegDefRecord(streamptr);
208       break;
209 #endif
210 #ifdef HAVE_LIBNETCDF
211     case CDI_FILETYPE_NETCDF:
212       if ( streamptr->accessmode == 0 ) cdfEndDef(streamptr);
213       cdfDefRecord(streamptr);
214       break;
215 #endif
216     default:
217       Error("%s support not compiled in!", strfiletype(streamptr->filetype));
218       break;
219     }
220 }
221 
222 
streamCopyRecord(int streamID2,int streamID1)223 void streamCopyRecord(int streamID2, int streamID1)
224 {
225   stream_t *streamptr1 = stream_to_pointer(streamID1),
226     *streamptr2 = stream_to_pointer(streamID2);
227   int filetype1 = streamptr1->filetype,
228     filetype2 = streamptr2->filetype,
229     filetype  = CDI_FILETYPE_UNDEF;
230 
231   if (cdiBaseFiletype(filetype1) == cdiBaseFiletype(filetype2)) filetype = filetype2;
232 
233   if ( filetype == CDI_FILETYPE_UNDEF )
234     Error("Streams have different file types (%s -> %s)!", strfiletype(filetype1), strfiletype(filetype2));
235 
236   switch (cdiBaseFiletype(filetype))
237     {
238 #ifdef HAVE_LIBGRIB
239     case CDI_FILETYPE_GRB:
240     case CDI_FILETYPE_GRB2:
241       grbCopyRecord(streamptr2, streamptr1);
242       break;
243 #endif
244 #ifdef HAVE_LIBSERVICE
245     case CDI_FILETYPE_SRV:
246       srvCopyRecord(streamptr2, streamptr1);
247       break;
248 #endif
249 #ifdef HAVE_LIBEXTRA
250     case CDI_FILETYPE_EXT:
251       extCopyRecord(streamptr2, streamptr1);
252       break;
253 #endif
254 #ifdef HAVE_LIBIEG
255     case CDI_FILETYPE_IEG:
256       iegCopyRecord(streamptr2, streamptr1);
257       break;
258 #endif
259 #ifdef HAVE_LIBNETCDF
260     case CDI_FILETYPE_NETCDF:
261       cdfCopyRecord(streamptr2, streamptr1);
262       break;
263 #endif
264     default:
265       {
266 	Error("%s support not compiled in!", strfiletype(filetype));
267 	break;
268       }
269     }
270 }
271 
272 
cdi_create_records(stream_t * streamptr,int tsID)273 void cdi_create_records(stream_t *streamptr, int tsID)
274 {
275   unsigned nrecords, maxrecords;
276 
277   tsteps_t *sourceTstep = streamptr->tsteps;
278   tsteps_t *destTstep = sourceTstep + tsID;
279 
280   if ( destTstep->records ) return;
281 
282   int vlistID = streamptr->vlistID;
283 
284   if ( tsID == 0 )
285     {
286       maxrecords = 0;
287       int nvars = streamptr->nvars;
288       for ( int varID = 0; varID < nvars; varID++)
289         for (int isub=0; isub<streamptr->vars[varID].subtypeSize; isub++)
290           maxrecords += (unsigned)streamptr->vars[varID].recordTable[isub].nlevs;
291     }
292   else
293     {
294       maxrecords = (unsigned)sourceTstep->recordSize;
295     }
296 
297   if ( tsID == 0 )
298     {
299       nrecords = maxrecords;
300     }
301   else if ( tsID == 1 )
302     {
303       nrecords = 0;
304       maxrecords = (unsigned)sourceTstep->recordSize;
305       for ( unsigned recID = 0; recID < maxrecords; recID++ )
306 	{
307 	  int varID = sourceTstep->records[recID].varID;
308 	  nrecords += (varID == CDI_UNDEFID /* varID = CDI_UNDEFID for write mode !!! */
309                        || vlistInqVarTimetype(vlistID, varID) != TIME_CONSTANT);
310           //    printf("varID nrecords %d %d %d \n", varID, nrecords, vlistInqVarTsteptype(vlistID, varID));
311 	}
312     }
313   else
314     {
315       nrecords = (unsigned)streamptr->tsteps[1].nallrecs;
316     }
317   //  printf("tsID, nrecords %d %d\n", tsID, nrecords);
318 
319   record_t *records = NULL;
320   if ( maxrecords > 0 ) records = (record_t *) Malloc(maxrecords*sizeof(record_t));
321 
322   destTstep->records    = records;
323   destTstep->recordSize = (int)maxrecords;
324   destTstep->nallrecs   = (int)nrecords;
325 
326   if ( tsID == 0 )
327     {
328       for ( unsigned recID = 0; recID < maxrecords; recID++ )
329         recordInitEntry(&destTstep->records[recID]);
330     }
331   else
332     {
333       memcpy(destTstep->records, sourceTstep->records, (size_t)maxrecords*sizeof(record_t));
334 
335       for ( unsigned recID = 0; recID < maxrecords; recID++ )
336 	{
337           record_t *curRecord = &sourceTstep->records[recID];
338           destTstep->records[recID].used = curRecord->used;
339           if ( curRecord->used != CDI_UNDEFID && curRecord->varID != -1 ) /* curRecord->varID = -1 for write mode !!! */
340             {
341               if ( vlistInqVarTimetype(vlistID, curRecord->varID) != TIME_CONSTANT )
342                 {
343                   destTstep->records[recID].position = CDI_UNDEFID;
344                   destTstep->records[recID].size     = 0;
345                   destTstep->records[recID].used     = false;
346                 }
347             }
348 	}
349     }
350 }
351 
352 #include "file.h"
353 
streamFCopyRecord(stream_t * streamptr2,stream_t * streamptr1,const char * container_name)354 void streamFCopyRecord(stream_t *streamptr2, stream_t *streamptr1, const char *container_name)
355 {
356   int fileID1 = streamptr1->fileID;
357   int fileID2 = streamptr2->fileID;
358 
359   int tsID    = streamptr1->curTsID;
360   int vrecID  = streamptr1->tsteps[tsID].curRecID;
361   int recID   = streamptr1->tsteps[tsID].recIDs[vrecID];
362   off_t recpos  = streamptr1->tsteps[tsID].records[recID].position;
363   size_t recsize = streamptr1->tsteps[tsID].records[recID].size;
364 
365   if (fileSetPos(fileID1, recpos, SEEK_SET) != 0)
366     Error("Cannot seek input file for %s record copy!", container_name);
367 
368   char *buffer = (char *) Malloc(recsize);
369 
370   if (fileRead(fileID1, buffer, recsize) != recsize)
371     Error("Failed to read record from %s file for copying!", container_name);
372 
373   if (fileWrite(fileID2, buffer, recsize) != recsize)
374     Error("Failed to write record to %s file when copying!", container_name);
375 
376   Free(buffer);
377 }
378 /*
379  * Local Variables:
380  * c-file-style: "Java"
381  * c-basic-offset: 2
382  * indent-tabs-mode: nil
383  * show-trailing-whitespace: t
384  * require-trailing-newline: t
385  * End:
386  */
387