1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4 
5 #ifdef HAVE_LIBGRIB
6 
7 #ifdef HAVE_LIBFDB5
8 #include "cdi_fdb.h"
9 #endif
10 
11 #include "async_worker.h"
12 #include "dmemory.h"
13 #include "cdi.h"
14 #include "cdi_int.h"
15 #include "stream_cgribex.h"
16 #include "stream_grb.h"
17 #include "stream_gribapi.h"
18 #include "file.h"
19 #include "cgribex.h"  /* gribZip gribGetZip gribGinfo */
20 
21 
22 static
grbDecode(int filetype,int memtype,void * cgribexp,void * gribbuffer,size_t gribsize,void * data,size_t datasize,int unreduced,size_t * nmiss,double missval)23 int grbDecode(int filetype, int memtype, void *cgribexp, void *gribbuffer, size_t gribsize, void *data, size_t datasize,
24 	      int unreduced, size_t *nmiss, double missval)
25 {
26   int status = 0;
27 
28 #ifdef HAVE_LIBCGRIBEX
29   if ( filetype == CDI_FILETYPE_GRB && !CDI_gribapi_grib1 )
30     {
31 #ifdef HAVE_LIBGRIB_API
32       extern int cdiNAdditionalGRIBKeys;
33       if ( cdiNAdditionalGRIBKeys > 0 )
34 	Error("CGRIBEX decode does not support reading of additional GRIB keys!");
35 #endif
36       status = cgribexDecode(memtype, cgribexp, gribbuffer, gribsize, data, datasize, unreduced, nmiss, missval);
37     }
38   else
39 #endif
40 #ifdef HAVE_LIBGRIB_API
41     {
42       void *datap = (memtype == MEMTYPE_FLOAT) ? Malloc(datasize*sizeof(double)) : data;
43 
44       status = gribapiDecode(gribbuffer, gribsize, datap, datasize, unreduced, nmiss, missval);
45 
46       if ( memtype == MEMTYPE_FLOAT )
47         {
48           float *dataf = (float*) data;
49           double *datad = (double*) datap;
50           for ( size_t i = 0; i < datasize; ++i ) dataf[i] = (float) datad[i];
51           Free(datap);
52         }
53     }
54 #else
55     {
56       Error("ecCodes support not compiled in!");
57     }
58 #endif
59 
60   return status;
61 }
62 
63 // Decompresses the grib data in gribbuffer.
64 static
grbUnzipRecord(void * gribbuffer,size_t * gribsize)65 int grbUnzipRecord(void *gribbuffer, size_t *gribsize)
66 {
67   int zip = 0;
68 
69   const size_t igribsize = *gribsize;
70   size_t ogribsize = *gribsize;
71 
72   int izip;
73   size_t unzipsize;
74   if ( (izip = gribGetZip(igribsize, (unsigned char *)gribbuffer, &unzipsize)) > 0 )
75     {
76       zip = izip;
77       if ( izip == 128 ) /* szip */
78 	{
79 	  if ( unzipsize < igribsize )
80 	    {
81 	      fprintf(stderr, "Decompressed size smaller than compressed size (in %zu; out %zu)!\n", igribsize, unzipsize);
82 	      return 0;
83 	    }
84 
85 	  unzipsize += 100; /* need 0 to 1 bytes for rounding of bds */
86 
87 	  void *buffer = Malloc(igribsize);
88 	  memcpy(buffer, gribbuffer, igribsize);
89 
90 	  ogribsize = (size_t)gribUnzip((unsigned char*)gribbuffer, (long)unzipsize, (unsigned char*)buffer, (long)igribsize);
91 
92           Free(buffer);
93 
94 	  if ( ogribsize <= 0 ) Error("Decompression problem!");
95 	}
96       else
97 	{
98 	  Error("Decompression for %d not implemented!", izip);
99 	}
100     }
101 
102   *gribsize = ogribsize;
103 
104   return zip;
105 }
106 
107 
108 typedef struct DecodeArgs {
109   int recID, *outZip, filetype, memtype, unreduced;
110   void *cgribexp, *gribbuffer, *data;
111   size_t recsize, gridsize, nmiss;
112   double missval;
113 } DecodeArgs;
114 
115 
116 static
grb_decode_record(void * untypedArgs)117 int grb_decode_record(void *untypedArgs)
118 {
119   DecodeArgs *args = (DecodeArgs *)untypedArgs;
120   *args->outZip = grbUnzipRecord(args->gribbuffer, &args->recsize);
121   grbDecode(args->filetype, args->memtype, args->cgribexp, args->gribbuffer, args->recsize, args->data, args->gridsize,
122             args->unreduced, &args->nmiss, args->missval);
123   return 0;
124 }
125 
126 static
grb_read_raw_data(stream_t * streamptr,int recID,int memtype,void * gribbuffer,void * data,bool resetFilePos)127 DecodeArgs grb_read_raw_data(stream_t *streamptr, int recID, int memtype, void *gribbuffer, void *data, bool resetFilePos)
128 {
129   const int vlistID = streamptr->vlistID;
130   const int tsID = streamptr->curTsID;	// FIXME: This should be looked up from the given recID
131   const int varID = streamptr->tsteps[tsID].records[recID].varID;
132   size_t recsize = streamptr->tsteps[tsID].records[recID].size;
133 
134   const int gridID = vlistInqVarGrid(vlistID, varID);
135   const size_t gridsize = gridInqSize(gridID);
136   if (CDI_Debug) Message("gridID = %d gridsize = %zu", gridID, gridsize);
137 
138   void *cgribexp = (gribbuffer && streamptr->record->cgribexp) ? streamptr->record->cgribexp : NULL;
139   if (!gribbuffer) gribbuffer = Malloc(streamptr->record->buffersize);
140   if (!data) data = Malloc(gridsize * (memtype == MEMTYPE_FLOAT ? sizeof(float) : sizeof(double)));
141 
142   if (streamptr->fdbRetrieve)
143     {
144 #ifdef HAVE_LIBFDB5
145       void *fdbItem = streamptr->tsteps[tsID].records[recID].fdbItem;
146       if (!fdbItem) Error("fdbItem not available!");
147 
148       recsize = fdb_read_record(streamptr->fdbHandle, fdbItem, &(streamptr->record->buffersize), &gribbuffer);
149 #endif
150     }
151   else
152     {
153       const int fileID = streamptr->fileID;
154       const off_t recpos = streamptr->tsteps[tsID].records[recID].position;
155 
156       if (recsize == 0) Error("Internal problem! Recordsize is zero for record %d at timestep %d", recID+1, tsID+1);
157 
158       if (resetFilePos)
159         {
160           const off_t currentfilepos = fileGetPos(fileID);
161           fileSetPos(fileID, recpos, SEEK_SET);
162           if (fileRead(fileID, gribbuffer, recsize) != recsize) Error("Failed to read GRIB record!");
163           fileSetPos(fileID, currentfilepos, SEEK_SET);
164         }
165       else
166         {
167           fileSetPos(fileID, recpos, SEEK_SET);
168           if (fileRead(fileID, gribbuffer, recsize) != recsize) Error("Failed to read GRIB record!");
169           streamptr->numvals += gridsize;
170         }
171     }
172 
173   return (DecodeArgs){
174     .recID = recID,
175     .outZip = &streamptr->tsteps[tsID].records[recID].zip,
176     .filetype = streamptr->filetype,
177     .memtype = memtype,
178     .cgribexp = cgribexp,
179     .gribbuffer = gribbuffer,
180     .recsize = recsize,
181     .data = data,
182     .gridsize = gridsize,
183     .unreduced = streamptr->unreduced,
184     .nmiss = 0,
185     .missval = vlistInqVarMissval(vlistID, varID),
186   };
187 }
188 
189 typedef struct JobDescriptor {
190   DecodeArgs args;
191   AsyncJob *job;
192 } JobDescriptor;
193 
194 static
JobDescriptor_startJob(AsyncManager * jobManager,JobDescriptor * me,stream_t * streamptr,int recID,int memtype,bool resetFilePos)195 void JobDescriptor_startJob(AsyncManager *jobManager, JobDescriptor *me, stream_t *streamptr, int recID, int memtype, bool resetFilePos)
196 {
197   me->args = grb_read_raw_data(streamptr, recID, memtype, NULL, NULL, resetFilePos);
198   me->job = AsyncWorker_requestWork(jobManager, grb_decode_record, &me->args);
199   if (!me->job) xabort("error while trying to send job to worker thread");
200 }
201 
202 static
JobDescriptor_finishJob(AsyncManager * jobManager,JobDescriptor * me,void * data,size_t * nmiss)203 void JobDescriptor_finishJob(AsyncManager *jobManager, JobDescriptor *me, void *data, size_t *nmiss)
204 {
205   if (AsyncWorker_wait(jobManager, me->job)) xabort("error executing job in worker thread");
206   memcpy(data, me->args.data, me->args.gridsize*(me->args.memtype == MEMTYPE_FLOAT ? sizeof(float) : sizeof(double)));
207   *nmiss = me->args.nmiss;
208 
209   Free(me->args.gribbuffer);
210   Free(me->args.data);
211   me->args.recID = -1;	// mark as inactive
212 }
213 
214 static
grb_read_next_record(stream_t * streamptr,int recID,int memtype,void * data,size_t * nmiss,bool resetFilePos)215 void grb_read_next_record(stream_t *streamptr, int recID, int memtype, void *data, size_t *nmiss, bool resetFilePos)
216 {
217   bool jobFound = false;
218 
219   int workerCount = streamptr->numWorker;
220   if (workerCount > 0)
221     {
222       if (workerCount > streamptr->tsteps[0].nrecs) workerCount = streamptr->tsteps[0].nrecs;
223 
224       AsyncManager *jobManager = (AsyncManager *) streamptr->jobManager;
225       JobDescriptor *jobs = (JobDescriptor *) streamptr->jobs;
226 
227       // if this is the first call, init and start worker threads
228       tsteps_t *timestep = &streamptr->tsteps[streamptr->curTsID];
229 
230       if (!jobs)
231         {
232           jobs = (JobDescriptor *)malloc(workerCount*sizeof*jobs);
233           streamptr->jobs = jobs;
234           for (int i = 0; i < workerCount; i++) jobs[i].args.recID = -1;
235           if (AsyncWorker_init(&jobManager, workerCount)) xabort("error while trying to start worker threads");
236           streamptr->jobManager = jobManager;
237         }
238 
239       if (recID == 0) streamptr->nextRecID = 0;
240       if (recID == 0) streamptr->cachedTsID = streamptr->curTsID; // no active workers -> we may start processing records of a new timestep
241 
242       if (streamptr->cachedTsID == streamptr->curTsID)
243         {
244           // Start as many new jobs as possible.
245           for (int i = 0; streamptr->nextRecID < timestep->nrecs && i < workerCount; i++)
246             {
247               JobDescriptor *jd = &jobs[i];
248               if (jd->args.recID < 0)
249                 {
250                   JobDescriptor_startJob(jobManager, jd, streamptr, timestep->recIDs[streamptr->nextRecID++], memtype, resetFilePos);
251                 }
252             }
253 
254           // search for a job descriptor with the given recID, and use its results if it exists
255           for (int i = 0; !jobFound && i < workerCount; i++)
256             {
257               JobDescriptor *jd = &jobs[i];
258               if (jd->args.recID == recID)
259                 {
260                   jobFound = true;
261                   JobDescriptor_finishJob(jobManager, jd, data, nmiss);
262                   if (streamptr->nextRecID < timestep->nrecs)
263                     {
264                       JobDescriptor_startJob(jobManager, jd, streamptr, timestep->recIDs[streamptr->nextRecID++], memtype, resetFilePos);
265                     }
266                 }
267             }
268         }
269     }
270 
271   // perform the work synchronously if we didn't start a job for it yet
272   if (!jobFound)
273     {
274       DecodeArgs args = grb_read_raw_data(streamptr, recID, memtype, streamptr->record->buffer, data, resetFilePos);
275       grb_decode_record(&args);
276       *nmiss = args.nmiss;
277     }
278 }
279 
280 
grb_read_record(stream_t * streamptr,int memtype,void * data,size_t * nmiss)281 void grb_read_record(stream_t *streamptr, int memtype, void *data, size_t *nmiss)
282 {
283   const int tsID = streamptr->curTsID;
284   const int vrecID = streamptr->tsteps[tsID].curRecID;
285   const int recID = streamptr->tsteps[tsID].recIDs[vrecID];
286 
287   grb_read_next_record(streamptr, recID, memtype, data, nmiss, false);
288 }
289 
290 
grb_read_var_slice(stream_t * streamptr,int varID,int levelID,int memtype,void * data,size_t * nmiss)291 void grb_read_var_slice(stream_t *streamptr, int varID, int levelID, int memtype, void *data, size_t *nmiss)
292 {
293   const int isub = subtypeInqActiveIndex(streamptr->vars[varID].subtypeID);
294   const int recID = streamptr->vars[varID].recordTable[isub].recordID[levelID];
295 
296   grb_read_next_record(streamptr, recID, memtype, data, nmiss, true);
297 }
298 
299 
grb_read_var(stream_t * streamptr,int varID,int memtype,void * data,size_t * nmiss)300 void grb_read_var(stream_t *streamptr, int varID, int memtype, void *data, size_t *nmiss)
301 {
302   const int vlistID = streamptr->vlistID;
303   const int fileID = streamptr->fileID;
304 
305   const int gridID = vlistInqVarGrid(vlistID, varID);
306   const size_t gridsize = gridInqSize(gridID);
307 
308   const off_t currentfilepos = fileGetPos(fileID);
309 
310   const int isub = subtypeInqActiveIndex(streamptr->vars[varID].subtypeID);
311   const int nlevs = streamptr->vars[varID].recordTable[0].nlevs;
312 
313   if ( CDI_Debug ) Message("nlevs = %d gridID = %d gridsize = %zu", nlevs, gridID, gridsize);
314 
315   *nmiss = 0;
316   for (int levelID = 0; levelID < nlevs; levelID++)
317     {
318       const int recID = streamptr->vars[varID].recordTable[isub].recordID[levelID];
319 
320       void *datap = NULL;
321       if ( memtype == MEMTYPE_FLOAT )
322         datap = (float*)data + levelID*gridsize;
323       else
324         datap = (double*)data + levelID*gridsize;
325 
326       size_t imiss;
327       grb_read_next_record(streamptr, recID, memtype, datap, &imiss, false);
328       *nmiss += imiss;
329     }
330 
331   fileSetPos(fileID, currentfilepos, SEEK_SET);
332 }
333 
334 #endif
335