1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4
5 #ifdef HAVE_LIBGRIB
6
7 #ifdef HAVE_LIBFDB5
8 #include "cdi_fdb.h"
9 #endif
10
11 #include "async_worker.h"
12 #include "dmemory.h"
13 #include "cdi.h"
14 #include "cdi_int.h"
15 #include "stream_cgribex.h"
16 #include "stream_grb.h"
17 #include "stream_gribapi.h"
18 #include "file.h"
19 #include "cgribex.h" /* gribZip gribGetZip gribGinfo */
20
21
22 static
grbDecode(int filetype,int memtype,void * cgribexp,void * gribbuffer,size_t gribsize,void * data,size_t datasize,int unreduced,size_t * nmiss,double missval)23 int grbDecode(int filetype, int memtype, void *cgribexp, void *gribbuffer, size_t gribsize, void *data, size_t datasize,
24 int unreduced, size_t *nmiss, double missval)
25 {
26 int status = 0;
27
28 #ifdef HAVE_LIBCGRIBEX
29 if ( filetype == CDI_FILETYPE_GRB && !CDI_gribapi_grib1 )
30 {
31 #ifdef HAVE_LIBGRIB_API
32 extern int cdiNAdditionalGRIBKeys;
33 if ( cdiNAdditionalGRIBKeys > 0 )
34 Error("CGRIBEX decode does not support reading of additional GRIB keys!");
35 #endif
36 status = cgribexDecode(memtype, cgribexp, gribbuffer, gribsize, data, datasize, unreduced, nmiss, missval);
37 }
38 else
39 #endif
40 #ifdef HAVE_LIBGRIB_API
41 {
42 void *datap = (memtype == MEMTYPE_FLOAT) ? Malloc(datasize*sizeof(double)) : data;
43
44 status = gribapiDecode(gribbuffer, gribsize, datap, datasize, unreduced, nmiss, missval);
45
46 if ( memtype == MEMTYPE_FLOAT )
47 {
48 float *dataf = (float*) data;
49 double *datad = (double*) datap;
50 for ( size_t i = 0; i < datasize; ++i ) dataf[i] = (float) datad[i];
51 Free(datap);
52 }
53 }
54 #else
55 {
56 Error("ecCodes support not compiled in!");
57 }
58 #endif
59
60 return status;
61 }
62
63 // Decompresses the grib data in gribbuffer.
64 static
grbUnzipRecord(void * gribbuffer,size_t * gribsize)65 int grbUnzipRecord(void *gribbuffer, size_t *gribsize)
66 {
67 int zip = 0;
68
69 const size_t igribsize = *gribsize;
70 size_t ogribsize = *gribsize;
71
72 int izip;
73 size_t unzipsize;
74 if ( (izip = gribGetZip(igribsize, (unsigned char *)gribbuffer, &unzipsize)) > 0 )
75 {
76 zip = izip;
77 if ( izip == 128 ) /* szip */
78 {
79 if ( unzipsize < igribsize )
80 {
81 fprintf(stderr, "Decompressed size smaller than compressed size (in %zu; out %zu)!\n", igribsize, unzipsize);
82 return 0;
83 }
84
85 unzipsize += 100; /* need 0 to 1 bytes for rounding of bds */
86
87 void *buffer = Malloc(igribsize);
88 memcpy(buffer, gribbuffer, igribsize);
89
90 ogribsize = (size_t)gribUnzip((unsigned char*)gribbuffer, (long)unzipsize, (unsigned char*)buffer, (long)igribsize);
91
92 Free(buffer);
93
94 if ( ogribsize <= 0 ) Error("Decompression problem!");
95 }
96 else
97 {
98 Error("Decompression for %d not implemented!", izip);
99 }
100 }
101
102 *gribsize = ogribsize;
103
104 return zip;
105 }
106
107
108 typedef struct DecodeArgs {
109 int recID, *outZip, filetype, memtype, unreduced;
110 void *cgribexp, *gribbuffer, *data;
111 size_t recsize, gridsize, nmiss;
112 double missval;
113 } DecodeArgs;
114
115
116 static
grb_decode_record(void * untypedArgs)117 int grb_decode_record(void *untypedArgs)
118 {
119 DecodeArgs *args = (DecodeArgs *)untypedArgs;
120 *args->outZip = grbUnzipRecord(args->gribbuffer, &args->recsize);
121 grbDecode(args->filetype, args->memtype, args->cgribexp, args->gribbuffer, args->recsize, args->data, args->gridsize,
122 args->unreduced, &args->nmiss, args->missval);
123 return 0;
124 }
125
126 static
grb_read_raw_data(stream_t * streamptr,int recID,int memtype,void * gribbuffer,void * data,bool resetFilePos)127 DecodeArgs grb_read_raw_data(stream_t *streamptr, int recID, int memtype, void *gribbuffer, void *data, bool resetFilePos)
128 {
129 const int vlistID = streamptr->vlistID;
130 const int tsID = streamptr->curTsID; // FIXME: This should be looked up from the given recID
131 const int varID = streamptr->tsteps[tsID].records[recID].varID;
132 size_t recsize = streamptr->tsteps[tsID].records[recID].size;
133
134 const int gridID = vlistInqVarGrid(vlistID, varID);
135 const size_t gridsize = gridInqSize(gridID);
136 if (CDI_Debug) Message("gridID = %d gridsize = %zu", gridID, gridsize);
137
138 void *cgribexp = (gribbuffer && streamptr->record->cgribexp) ? streamptr->record->cgribexp : NULL;
139 if (!gribbuffer) gribbuffer = Malloc(streamptr->record->buffersize);
140 if (!data) data = Malloc(gridsize * (memtype == MEMTYPE_FLOAT ? sizeof(float) : sizeof(double)));
141
142 if (streamptr->fdbRetrieve)
143 {
144 #ifdef HAVE_LIBFDB5
145 void *fdbItem = streamptr->tsteps[tsID].records[recID].fdbItem;
146 if (!fdbItem) Error("fdbItem not available!");
147
148 recsize = fdb_read_record(streamptr->fdbHandle, fdbItem, &(streamptr->record->buffersize), &gribbuffer);
149 #endif
150 }
151 else
152 {
153 const int fileID = streamptr->fileID;
154 const off_t recpos = streamptr->tsteps[tsID].records[recID].position;
155
156 if (recsize == 0) Error("Internal problem! Recordsize is zero for record %d at timestep %d", recID+1, tsID+1);
157
158 if (resetFilePos)
159 {
160 const off_t currentfilepos = fileGetPos(fileID);
161 fileSetPos(fileID, recpos, SEEK_SET);
162 if (fileRead(fileID, gribbuffer, recsize) != recsize) Error("Failed to read GRIB record!");
163 fileSetPos(fileID, currentfilepos, SEEK_SET);
164 }
165 else
166 {
167 fileSetPos(fileID, recpos, SEEK_SET);
168 if (fileRead(fileID, gribbuffer, recsize) != recsize) Error("Failed to read GRIB record!");
169 streamptr->numvals += gridsize;
170 }
171 }
172
173 return (DecodeArgs){
174 .recID = recID,
175 .outZip = &streamptr->tsteps[tsID].records[recID].zip,
176 .filetype = streamptr->filetype,
177 .memtype = memtype,
178 .cgribexp = cgribexp,
179 .gribbuffer = gribbuffer,
180 .recsize = recsize,
181 .data = data,
182 .gridsize = gridsize,
183 .unreduced = streamptr->unreduced,
184 .nmiss = 0,
185 .missval = vlistInqVarMissval(vlistID, varID),
186 };
187 }
188
189 typedef struct JobDescriptor {
190 DecodeArgs args;
191 AsyncJob *job;
192 } JobDescriptor;
193
194 static
JobDescriptor_startJob(AsyncManager * jobManager,JobDescriptor * me,stream_t * streamptr,int recID,int memtype,bool resetFilePos)195 void JobDescriptor_startJob(AsyncManager *jobManager, JobDescriptor *me, stream_t *streamptr, int recID, int memtype, bool resetFilePos)
196 {
197 me->args = grb_read_raw_data(streamptr, recID, memtype, NULL, NULL, resetFilePos);
198 me->job = AsyncWorker_requestWork(jobManager, grb_decode_record, &me->args);
199 if (!me->job) xabort("error while trying to send job to worker thread");
200 }
201
202 static
JobDescriptor_finishJob(AsyncManager * jobManager,JobDescriptor * me,void * data,size_t * nmiss)203 void JobDescriptor_finishJob(AsyncManager *jobManager, JobDescriptor *me, void *data, size_t *nmiss)
204 {
205 if (AsyncWorker_wait(jobManager, me->job)) xabort("error executing job in worker thread");
206 memcpy(data, me->args.data, me->args.gridsize*(me->args.memtype == MEMTYPE_FLOAT ? sizeof(float) : sizeof(double)));
207 *nmiss = me->args.nmiss;
208
209 Free(me->args.gribbuffer);
210 Free(me->args.data);
211 me->args.recID = -1; // mark as inactive
212 }
213
214 static
grb_read_next_record(stream_t * streamptr,int recID,int memtype,void * data,size_t * nmiss,bool resetFilePos)215 void grb_read_next_record(stream_t *streamptr, int recID, int memtype, void *data, size_t *nmiss, bool resetFilePos)
216 {
217 bool jobFound = false;
218
219 int workerCount = streamptr->numWorker;
220 if (workerCount > 0)
221 {
222 if (workerCount > streamptr->tsteps[0].nrecs) workerCount = streamptr->tsteps[0].nrecs;
223
224 AsyncManager *jobManager = (AsyncManager *) streamptr->jobManager;
225 JobDescriptor *jobs = (JobDescriptor *) streamptr->jobs;
226
227 // if this is the first call, init and start worker threads
228 tsteps_t *timestep = &streamptr->tsteps[streamptr->curTsID];
229
230 if (!jobs)
231 {
232 jobs = (JobDescriptor *)malloc(workerCount*sizeof*jobs);
233 streamptr->jobs = jobs;
234 for (int i = 0; i < workerCount; i++) jobs[i].args.recID = -1;
235 if (AsyncWorker_init(&jobManager, workerCount)) xabort("error while trying to start worker threads");
236 streamptr->jobManager = jobManager;
237 }
238
239 if (recID == 0) streamptr->nextRecID = 0;
240 if (recID == 0) streamptr->cachedTsID = streamptr->curTsID; // no active workers -> we may start processing records of a new timestep
241
242 if (streamptr->cachedTsID == streamptr->curTsID)
243 {
244 // Start as many new jobs as possible.
245 for (int i = 0; streamptr->nextRecID < timestep->nrecs && i < workerCount; i++)
246 {
247 JobDescriptor *jd = &jobs[i];
248 if (jd->args.recID < 0)
249 {
250 JobDescriptor_startJob(jobManager, jd, streamptr, timestep->recIDs[streamptr->nextRecID++], memtype, resetFilePos);
251 }
252 }
253
254 // search for a job descriptor with the given recID, and use its results if it exists
255 for (int i = 0; !jobFound && i < workerCount; i++)
256 {
257 JobDescriptor *jd = &jobs[i];
258 if (jd->args.recID == recID)
259 {
260 jobFound = true;
261 JobDescriptor_finishJob(jobManager, jd, data, nmiss);
262 if (streamptr->nextRecID < timestep->nrecs)
263 {
264 JobDescriptor_startJob(jobManager, jd, streamptr, timestep->recIDs[streamptr->nextRecID++], memtype, resetFilePos);
265 }
266 }
267 }
268 }
269 }
270
271 // perform the work synchronously if we didn't start a job for it yet
272 if (!jobFound)
273 {
274 DecodeArgs args = grb_read_raw_data(streamptr, recID, memtype, streamptr->record->buffer, data, resetFilePos);
275 grb_decode_record(&args);
276 *nmiss = args.nmiss;
277 }
278 }
279
280
grb_read_record(stream_t * streamptr,int memtype,void * data,size_t * nmiss)281 void grb_read_record(stream_t *streamptr, int memtype, void *data, size_t *nmiss)
282 {
283 const int tsID = streamptr->curTsID;
284 const int vrecID = streamptr->tsteps[tsID].curRecID;
285 const int recID = streamptr->tsteps[tsID].recIDs[vrecID];
286
287 grb_read_next_record(streamptr, recID, memtype, data, nmiss, false);
288 }
289
290
grb_read_var_slice(stream_t * streamptr,int varID,int levelID,int memtype,void * data,size_t * nmiss)291 void grb_read_var_slice(stream_t *streamptr, int varID, int levelID, int memtype, void *data, size_t *nmiss)
292 {
293 const int isub = subtypeInqActiveIndex(streamptr->vars[varID].subtypeID);
294 const int recID = streamptr->vars[varID].recordTable[isub].recordID[levelID];
295
296 grb_read_next_record(streamptr, recID, memtype, data, nmiss, true);
297 }
298
299
grb_read_var(stream_t * streamptr,int varID,int memtype,void * data,size_t * nmiss)300 void grb_read_var(stream_t *streamptr, int varID, int memtype, void *data, size_t *nmiss)
301 {
302 const int vlistID = streamptr->vlistID;
303 const int fileID = streamptr->fileID;
304
305 const int gridID = vlistInqVarGrid(vlistID, varID);
306 const size_t gridsize = gridInqSize(gridID);
307
308 const off_t currentfilepos = fileGetPos(fileID);
309
310 const int isub = subtypeInqActiveIndex(streamptr->vars[varID].subtypeID);
311 const int nlevs = streamptr->vars[varID].recordTable[0].nlevs;
312
313 if ( CDI_Debug ) Message("nlevs = %d gridID = %d gridsize = %zu", nlevs, gridID, gridsize);
314
315 *nmiss = 0;
316 for (int levelID = 0; levelID < nlevs; levelID++)
317 {
318 const int recID = streamptr->vars[varID].recordTable[isub].recordID[levelID];
319
320 void *datap = NULL;
321 if ( memtype == MEMTYPE_FLOAT )
322 datap = (float*)data + levelID*gridsize;
323 else
324 datap = (double*)data + levelID*gridsize;
325
326 size_t imiss;
327 grb_read_next_record(streamptr, recID, memtype, datap, &imiss, false);
328 *nmiss += imiss;
329 }
330
331 fileSetPos(fileID, currentfilepos, SEEK_SET);
332 }
333
334 #endif
335