1 /*********************************************************************
2   Blosc - Blocked Shuffling and Compression Library
3 
4   Author: Francesc Alted <francesc@blosc.org>
5   Creation date: 2009-05-20
6 
7   See LICENSES/BLOSC.txt for details about copyright and rights to use.
8 **********************************************************************/
9 
10 
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <errno.h>
14 #include <string.h>
15 #include <sys/types.h>
16 #include <assert.h>
17 
18 #include "fastcopy.h"
19 
20 #if defined(USING_CMAKE)
21   #include "config.h"
22 #endif /*  USING_CMAKE */
23 #include "blosc.h"
24 #include "shuffle.h"
25 #include "blosclz.h"
26 #if defined(HAVE_LZ4)
27   #include "lz4.h"
28   #include "lz4hc.h"
29 #endif /*  HAVE_LZ4 */
30 #if defined(HAVE_SNAPPY)
31   #include "snappy-c.h"
32 #endif /*  HAVE_SNAPPY */
33 #if defined(HAVE_ZLIB)
34   #include "zlib.h"
35 #endif /*  HAVE_ZLIB */
36 #if defined(HAVE_ZSTD)
37   #include "zstd.h"
38 #endif /*  HAVE_ZSTD */
39 
40 #if defined(_WIN32) && !defined(__MINGW32__)
41   #include <windows.h>
42   #include <malloc.h>
43 
44   /* stdint.h only available in VS2010 (VC++ 16.0) and newer */
45   #if defined(_MSC_VER) && _MSC_VER < 1600
46     #include "win32/stdint-windows.h"
47   #else
48     #include <stdint.h>
49   #endif
50 
51   #include <process.h>
52   #define getpid _getpid
53 #else
54   #include <stdint.h>
55   #include <unistd.h>
56   #include <inttypes.h>
57 #endif  /* _WIN32 */
58 
59 /* Include the win32/pthread.h library for all the Windows builds. See #224. */
60 #if defined(_WIN32)
61   #include "win32/pthread.h"
62   #include "win32/pthread.c"
63 #else
64   #include <pthread.h>
65 #endif
66 
67 
68 /* Some useful units */
69 #define KB 1024
70 #define MB (1024 * (KB))
71 
72 /* Minimum buffer size to be compressed */
73 #define MIN_BUFFERSIZE 128       /* Cannot be smaller than 66 */
74 
75 /* The maximum number of splits in a block for compression */
76 #define MAX_SPLITS 16            /* Cannot be larger than 128 */
77 
78 /* The size of L1 cache.  32 KB is quite common nowadays. */
79 #define L1 (32 * (KB))
80 
81 /* Have problems using posix barriers when symbol value is 200112L */
82 /* This requires more investigation, but will work for the moment */
83 #if defined(_POSIX_BARRIERS) && ( (_POSIX_BARRIERS - 20012L) >= 0 && _POSIX_BARRIERS != 200112L)
84 #define _POSIX_BARRIERS_MINE
85 #endif
86 /* Synchronization variables */
87 
88 
89 struct blosc_context {
90   int32_t compress;               /* 1 if we are doing compression 0 if decompress */
91 
92   const uint8_t* src;
93   uint8_t* dest;                  /* The current pos in the destination buffer */
94   uint8_t* header_flags;          /* Flags for header */
95   int compversion;                /* Compressor version byte, only used during decompression */
96   int32_t sourcesize;             /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */
97   int32_t compressedsize;         /* Number of bytes of compressed data (only used when decompressing) */
98   int32_t nblocks;                /* Number of total blocks in buffer */
99   int32_t leftover;               /* Extra bytes at end of buffer */
100   int32_t blocksize;              /* Length of the block in bytes */
101   int32_t typesize;               /* Type size */
102   int32_t num_output_bytes;       /* Counter for the number of output bytes */
103   int32_t destsize;               /* Maximum size for destination buffer */
104   uint8_t* bstarts;               /* Start of the buffer past header info */
105   int32_t compcode;               /* Compressor code to use */
106   int clevel;                     /* Compression level (1-9) */
107   /* Function to use for decompression.  Only used when decompression */
108   int (*decompress_func)(const void* input, int compressed_length, void* output,
109                          int maxout);
110 
111   /* Threading */
112   int32_t numthreads;
113   int32_t threads_started;
114   int32_t end_threads;
115   pthread_t threads[BLOSC_MAX_THREADS];
116   int32_t tids[BLOSC_MAX_THREADS];
117   pthread_mutex_t count_mutex;
118   #ifdef _POSIX_BARRIERS_MINE
119   pthread_barrier_t barr_init;
120   pthread_barrier_t barr_finish;
121   #else
122   int32_t count_threads;
123   pthread_mutex_t count_threads_mutex;
124   pthread_cond_t count_threads_cv;
125   #endif
126   #if !defined(_WIN32)
127   pthread_attr_t ct_attr;            /* creation time attrs for threads */
128   #endif
129   int32_t thread_giveup_code;               /* error code when give up */
130   int32_t thread_nblock;                    /* block counter */
131 };
132 
133 struct thread_context {
134   struct blosc_context* parent_context;
135   int32_t tid;
136   uint8_t* tmp;
137   uint8_t* tmp2;
138   uint8_t* tmp3;
139   int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */
140 };
141 
142 /* Global context for non-contextual API */
143 static struct blosc_context* g_global_context;
144 static pthread_mutex_t* global_comp_mutex;
145 static int32_t g_compressor = BLOSC_BLOSCLZ;  /* the compressor to use by default */
146 static int32_t g_threads = 1;
147 static int32_t g_force_blocksize = 0;
148 static int32_t g_initlib = 0;
149 static int32_t g_atfork_registered = 0;
150 static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
151 
152 
153 
154 /* Wrapped function to adjust the number of threads used by blosc */
155 int blosc_set_nthreads_(struct blosc_context*);
156 
157 /* Releases the global threadpool */
158 int blosc_release_threadpool(struct blosc_context* context);
159 
160 /* Macros for synchronization */
161 
162 /* Wait until all threads are initialized */
163 #ifdef _POSIX_BARRIERS_MINE
164 #define WAIT_INIT(RET_VAL, CONTEXT_PTR)  \
165   rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \
166   if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
167     printf("Could not wait on barrier (init): %d\n", rc); \
168     return((RET_VAL));                            \
169   }
170 #else
171 #define WAIT_INIT(RET_VAL, CONTEXT_PTR)   \
172   pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
173   if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \
174     CONTEXT_PTR->count_threads++;  \
175     pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
176   } \
177   else { \
178     pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
179   } \
180   pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
181 #endif
182 
183 /* Wait for all threads to finish */
184 #ifdef _POSIX_BARRIERS_MINE
185 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR)   \
186   rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \
187   if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
188     printf("Could not wait on barrier (finish)\n"); \
189     return((RET_VAL));                              \
190   }
191 #else
192 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                           \
193   pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
194   if (CONTEXT_PTR->count_threads > 0) { \
195     CONTEXT_PTR->count_threads--; \
196     pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
197   } \
198   else { \
199     pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
200   } \
201   pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
202 #endif
203 
204 
205 /* A function for aligned malloc that is portable */
my_malloc(size_t size)206 static uint8_t *my_malloc(size_t size)
207 {
208   void *block = NULL;
209   int res = 0;
210 
211 /* Do an alignment to 32 bytes because AVX2 is supported */
212 #if defined(_WIN32)
213   /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
214   block = (void *)_aligned_malloc(size, 32);
215 #elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
216   /* Platform does have an implementation of posix_memalign */
217   res = posix_memalign(&block, 32, size);
218 #else
219   block = malloc(size);
220 #endif  /* _WIN32 */
221 
222   if (block == NULL || res != 0) {
223     printf("Error allocating memory!");
224     return NULL;
225   }
226 
227   return (uint8_t *)block;
228 }
229 
230 
231 /* Release memory booked by my_malloc */
my_free(void * block)232 static void my_free(void *block)
233 {
234 #if defined(_WIN32)
235     _aligned_free(block);
236 #else
237     free(block);
238 #endif  /* _WIN32 */
239 }
240 
241 
242 /* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */
sw32_(const uint8_t * pa)243 static int32_t sw32_(const uint8_t *pa)
244 {
245   int32_t idest;
246   uint8_t *dest = (uint8_t *)&idest;
247   int i = 1;                    /* for big/little endian detection */
248   char *p = (char *)&i;
249 
250   if (p[0] != 1) {
251     /* big endian */
252     dest[0] = pa[3];
253     dest[1] = pa[2];
254     dest[2] = pa[1];
255     dest[3] = pa[0];
256   }
257   else {
258     /* little endian */
259     dest[0] = pa[0];
260     dest[1] = pa[1];
261     dest[2] = pa[2];
262     dest[3] = pa[3];
263   }
264   return idest;
265 }
266 
267 
268 /* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */
_sw32(uint8_t * dest,int32_t a)269 static void _sw32(uint8_t* dest, int32_t a)
270 {
271   uint8_t *pa = (uint8_t *)&a;
272   int i = 1;                    /* for big/little endian detection */
273   char *p = (char *)&i;
274 
275   if (p[0] != 1) {
276     /* big endian */
277     dest[0] = pa[3];
278     dest[1] = pa[2];
279     dest[2] = pa[1];
280     dest[3] = pa[0];
281   }
282   else {
283     /* little endian */
284     dest[0] = pa[0];
285     dest[1] = pa[1];
286     dest[2] = pa[2];
287     dest[3] = pa[3];
288   }
289 }
290 
291 /*
292  * Conversion routines between compressor and compression libraries
293  */
294 
295 /* Return the library code associated with the compressor name */
compname_to_clibcode(const char * compname)296 static int compname_to_clibcode(const char *compname)
297 {
298   if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
299     return BLOSC_BLOSCLZ_LIB;
300   if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
301     return BLOSC_LZ4_LIB;
302   if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
303     return BLOSC_LZ4_LIB;
304   if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0)
305     return BLOSC_SNAPPY_LIB;
306   if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
307     return BLOSC_ZLIB_LIB;
308   if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
309     return BLOSC_ZSTD_LIB;
310   return -1;
311 }
312 
313 /* Return the library name associated with the compressor code */
clibcode_to_clibname(int clibcode)314 static const char *clibcode_to_clibname(int clibcode)
315 {
316   if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
317   if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
318   if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME;
319   if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
320   if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
321   return NULL;                  /* should never happen */
322 }
323 
324 
325 /*
326  * Conversion routines between compressor names and compressor codes
327  */
328 
329 /* Get the compressor name associated with the compressor code */
blosc_compcode_to_compname(int compcode,const char ** compname)330 int blosc_compcode_to_compname(int compcode, const char **compname)
331 {
332   int code = -1;    /* -1 means non-existent compressor code */
333   const char *name = NULL;
334 
335   /* Map the compressor code */
336   if (compcode == BLOSC_BLOSCLZ)
337     name = BLOSC_BLOSCLZ_COMPNAME;
338   else if (compcode == BLOSC_LZ4)
339     name = BLOSC_LZ4_COMPNAME;
340   else if (compcode == BLOSC_LZ4HC)
341     name = BLOSC_LZ4HC_COMPNAME;
342   else if (compcode == BLOSC_SNAPPY)
343     name = BLOSC_SNAPPY_COMPNAME;
344   else if (compcode == BLOSC_ZLIB)
345     name = BLOSC_ZLIB_COMPNAME;
346   else if (compcode == BLOSC_ZSTD)
347     name = BLOSC_ZSTD_COMPNAME;
348 
349   *compname = name;
350 
351   /* Guess if there is support for this code */
352   if (compcode == BLOSC_BLOSCLZ)
353     code = BLOSC_BLOSCLZ;
354 #if defined(HAVE_LZ4)
355   else if (compcode == BLOSC_LZ4)
356     code = BLOSC_LZ4;
357   else if (compcode == BLOSC_LZ4HC)
358     code = BLOSC_LZ4HC;
359 #endif /*  HAVE_LZ4 */
360 #if defined(HAVE_SNAPPY)
361   else if (compcode == BLOSC_SNAPPY)
362     code = BLOSC_SNAPPY;
363 #endif /*  HAVE_SNAPPY */
364 #if defined(HAVE_ZLIB)
365   else if (compcode == BLOSC_ZLIB)
366     code = BLOSC_ZLIB;
367 #endif /*  HAVE_ZLIB */
368 #if defined(HAVE_ZSTD)
369   else if (compcode == BLOSC_ZSTD)
370     code = BLOSC_ZSTD;
371 #endif /*  HAVE_ZSTD */
372 
373   return code;
374 }
375 
376 /* Get the compressor code for the compressor name. -1 if it is not available */
blosc_compname_to_compcode(const char * compname)377 int blosc_compname_to_compcode(const char *compname)
378 {
379   int code = -1;  /* -1 means non-existent compressor code */
380 
381   if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
382     code = BLOSC_BLOSCLZ;
383   }
384 #if defined(HAVE_LZ4)
385   else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
386     code = BLOSC_LZ4;
387   }
388   else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
389     code = BLOSC_LZ4HC;
390   }
391 #endif /*  HAVE_LZ4 */
392 #if defined(HAVE_SNAPPY)
393   else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) {
394     code = BLOSC_SNAPPY;
395   }
396 #endif /*  HAVE_SNAPPY */
397 #if defined(HAVE_ZLIB)
398   else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
399     code = BLOSC_ZLIB;
400   }
401 #endif /*  HAVE_ZLIB */
402 #if defined(HAVE_ZSTD)
403   else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
404     code = BLOSC_ZSTD;
405   }
406 #endif /*  HAVE_ZSTD */
407 
408 return code;
409 }
410 
411 
412 #if defined(HAVE_LZ4)
lz4_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int accel)413 static int lz4_wrap_compress(const char* input, size_t input_length,
414                              char* output, size_t maxout, int accel)
415 {
416   int cbytes;
417   cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout,
418                              accel);
419   return cbytes;
420 }
421 
lz4hc_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)422 static int lz4hc_wrap_compress(const char* input, size_t input_length,
423                                char* output, size_t maxout, int clevel)
424 {
425   int cbytes;
426   if (input_length > (size_t)(UINT32_C(2)<<30))
427     return -1;   /* input larger than 2 GB is not supported */
428   /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
429    * but levels larger than 9 do not buy much compression. */
430   cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
431                            clevel);
432   return cbytes;
433 }
434 
lz4_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)435 static int lz4_wrap_decompress(const void* input, int compressed_length,
436                                void* output, int maxout)
437 {
438   return LZ4_decompress_safe(input, output, compressed_length, maxout);
439 }
440 
441 #endif /* HAVE_LZ4 */
442 
443 #if defined(HAVE_SNAPPY)
snappy_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout)444 static int snappy_wrap_compress(const char* input, size_t input_length,
445                                 char* output, size_t maxout)
446 {
447   snappy_status status;
448   size_t cl = maxout;
449   status = snappy_compress(input, input_length, output, &cl);
450   if (status != SNAPPY_OK){
451     return 0;
452   }
453   return (int)cl;
454 }
455 
snappy_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)456 static int snappy_wrap_decompress(const void* input, int compressed_length,
457                                   void* output, int maxout)
458 {
459   snappy_status status;
460   size_t ul = maxout;
461   status = snappy_uncompress(input, compressed_length, output, &ul);
462   if (status != SNAPPY_OK){
463     return 0;
464   }
465   return (int)ul;
466 }
467 #endif /* HAVE_SNAPPY */
468 
469 #if defined(HAVE_ZLIB)
470 /* zlib is not very respectful with sharing name space with others.
471  Fortunately, its names do not collide with those already in blosc. */
zlib_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)472 static int zlib_wrap_compress(const char* input, size_t input_length,
473                               char* output, size_t maxout, int clevel)
474 {
475   int status;
476   uLongf cl = maxout;
477   status = compress2(
478              (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
479   if (status != Z_OK){
480     return 0;
481   }
482   return (int)cl;
483 }
484 
zlib_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)485 static int zlib_wrap_decompress(const void* input, int compressed_length,
486                                 void* output, int maxout) {
487   int status;
488   uLongf ul = maxout;
489   status = uncompress(
490              (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
491   if (status != Z_OK){
492     return 0;
493   }
494   return (int)ul;
495 }
496 #endif /*  HAVE_ZLIB */
497 
498 #if defined(HAVE_ZSTD)
zstd_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)499 static int zstd_wrap_compress(const char* input, size_t input_length,
500                               char* output, size_t maxout, int clevel) {
501   size_t code;
502   clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
503   /* Make the level 8 close enough to maxCLevel */
504   if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
505   code = ZSTD_compress(
506       (void*)output, maxout, (void*)input, input_length, clevel);
507   if (ZSTD_isError(code)) {
508     return 0;
509   }
510   return (int)code;
511 }
512 
zstd_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)513 static int zstd_wrap_decompress(const void* input, int compressed_length,
514                                 void* output, int maxout) {
515   size_t code;
516   code = ZSTD_decompress(
517       (void*)output, maxout, (void*)input, compressed_length);
518   if (ZSTD_isError(code)) {
519     return 0;
520   }
521   return (int)code;
522 }
523 #endif /*  HAVE_ZSTD */
524 
initialize_decompress_func(struct blosc_context * context)525 static int initialize_decompress_func(struct blosc_context* context) {
526   int8_t header_flags = *(context->header_flags);
527   int32_t compformat = (header_flags & 0xe0) >> 5;
528   int compversion = context->compversion;
529 
530   if (compformat == BLOSC_BLOSCLZ_FORMAT) {
531     if (compversion != BLOSC_BLOSCLZ_VERSION_FORMAT) {
532       return -9;
533     }
534     context->decompress_func = &blosclz_decompress;
535     return 0;
536   }
537 #if defined(HAVE_LZ4)
538   if (compformat == BLOSC_LZ4_FORMAT) {
539     if (compversion != BLOSC_LZ4_VERSION_FORMAT) {
540       return -9;
541     }
542     context->decompress_func = &lz4_wrap_decompress;
543     return 0;
544   }
545 #endif /*  HAVE_LZ4 */
546 #if defined(HAVE_SNAPPY)
547   if (compformat == BLOSC_SNAPPY_FORMAT) {
548     if (compversion != BLOSC_SNAPPY_VERSION_FORMAT) {
549       return -9;
550     }
551     context->decompress_func = &snappy_wrap_decompress;
552     return 0;
553   }
554 #endif /*  HAVE_SNAPPY */
555 #if defined(HAVE_ZLIB)
556   if (compformat == BLOSC_ZLIB_FORMAT) {
557     if (compversion != BLOSC_ZLIB_VERSION_FORMAT) {
558       return -9;
559     }
560     context->decompress_func = &zlib_wrap_decompress;
561     return 0;
562   }
563 #endif /*  HAVE_ZLIB */
564 #if defined(HAVE_ZSTD)
565   if (compformat == BLOSC_ZSTD_FORMAT) {
566     if (compversion != BLOSC_ZSTD_VERSION_FORMAT) {
567       return -9;
568     }
569     context->decompress_func = &zstd_wrap_decompress;
570     return 0;
571   }
572 #endif /*  HAVE_ZSTD */
573   return -5; /* signals no decompression support */
574 }
575 
576 /* Compute acceleration for blosclz */
get_accel(const struct blosc_context * context)577 static int get_accel(const struct blosc_context* context) {
578   int32_t clevel = context->clevel;
579 
580   if (context->compcode == BLOSC_LZ4) {
581     /* This acceleration setting based on discussions held in:
582      * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
583      */
584     return (10 - clevel);
585   }
586   return 1;
587 }
588 
589 
590 /* Shuffle & compress a single block */
blosc_c(const struct blosc_context * context,int32_t blocksize,int32_t leftoverblock,int32_t ntbytes,int32_t maxbytes,const uint8_t * src,uint8_t * dest,uint8_t * tmp,uint8_t * tmp2)591 static int blosc_c(const struct blosc_context* context, int32_t blocksize,
592                    int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes,
593                    const uint8_t *src, uint8_t *dest, uint8_t *tmp,
594                    uint8_t *tmp2)
595 {
596   int8_t header_flags = *(context->header_flags);
597   int dont_split = (header_flags & 0x10) >> 4;
598   int32_t j, neblock, nsplits;
599   int32_t cbytes;                   /* number of compressed bytes in split */
600   int32_t ctbytes = 0;              /* number of compressed bytes in block */
601   int32_t maxout;
602   int32_t typesize = context->typesize;
603   const uint8_t *_tmp = src;
604   const char *compname;
605   int accel;
606   int bscount;
607   int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
608   int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
609                       (blocksize >= typesize));
610 
611   if (doshuffle) {
612     /* Byte shuffling only makes sense if typesize > 1 */
613     blosc_internal_shuffle(typesize, blocksize, src, tmp);
614     _tmp = tmp;
615   }
616   /* We don't allow more than 1 filter at the same time (yet) */
617   else if (dobitshuffle) {
618     bscount = blosc_internal_bitshuffle(typesize, blocksize, src, tmp, tmp2);
619     if (bscount < 0)
620       return bscount;
621     _tmp = tmp;
622   }
623 
624   /* Calculate acceleration for different compressors */
625   accel = get_accel(context);
626 
627   /* The number of splits for this block */
628   if (!dont_split && !leftoverblock) {
629     nsplits = typesize;
630   }
631   else {
632     nsplits = 1;
633   }
634   neblock = blocksize / nsplits;
635   for (j = 0; j < nsplits; j++) {
636     dest += sizeof(int32_t);
637     ntbytes += (int32_t)sizeof(int32_t);
638     ctbytes += (int32_t)sizeof(int32_t);
639     maxout = neblock;
640     #if defined(HAVE_SNAPPY)
641     if (context->compcode == BLOSC_SNAPPY) {
642       /* TODO perhaps refactor this to keep the value stashed somewhere */
643       maxout = snappy_max_compressed_length(neblock);
644     }
645     #endif /*  HAVE_SNAPPY */
646     if (ntbytes+maxout > maxbytes) {
647       maxout = maxbytes - ntbytes;   /* avoid buffer overrun */
648       if (maxout <= 0) {
649         return 0;                  /* non-compressible block */
650       }
651     }
652     if (context->compcode == BLOSC_BLOSCLZ) {
653       cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock,
654                                 dest, maxout);
655     }
656     #if defined(HAVE_LZ4)
657     else if (context->compcode == BLOSC_LZ4) {
658       cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
659                                  (char *)dest, (size_t)maxout, accel);
660     }
661     else if (context->compcode == BLOSC_LZ4HC) {
662       cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
663                                    (char *)dest, (size_t)maxout,
664                                    context->clevel);
665     }
666     #endif /* HAVE_LZ4 */
667     #if defined(HAVE_SNAPPY)
668     else if (context->compcode == BLOSC_SNAPPY) {
669       cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
670                                     (char *)dest, (size_t)maxout);
671     }
672     #endif /* HAVE_SNAPPY */
673     #if defined(HAVE_ZLIB)
674     else if (context->compcode == BLOSC_ZLIB) {
675       cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
676                                   (char *)dest, (size_t)maxout,
677                                   context->clevel);
678     }
679     #endif /* HAVE_ZLIB */
680     #if defined(HAVE_ZSTD)
681     else if (context->compcode == BLOSC_ZSTD) {
682       cbytes = zstd_wrap_compress((char*)_tmp + j * neblock, (size_t)neblock,
683                                   (char*)dest, (size_t)maxout, context->clevel);
684     }
685     #endif /* HAVE_ZSTD */
686 
687     else {
688       blosc_compcode_to_compname(context->compcode, &compname);
689       if (compname == NULL) {
690           compname = "(null)";
691       }
692       fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
693       fprintf(stderr, "compression support.  Please use one having it.");
694       return -5;    /* signals no compression support */
695     }
696 
697     if (cbytes > maxout) {
698       /* Buffer overrun caused by compression (should never happen) */
699       return -1;
700     }
701     else if (cbytes < 0) {
702       /* cbytes should never be negative */
703       return -2;
704     }
705     else if (cbytes == 0 || cbytes == neblock) {
706       /* The compressor has been unable to compress data at all. */
707       /* Before doing the copy, check that we are not running into a
708          buffer overflow. */
709       if ((ntbytes+neblock) > maxbytes) {
710         return 0;    /* Non-compressible data */
711       }
712       fastcopy(dest, _tmp + j * neblock, neblock);
713       cbytes = neblock;
714     }
715     _sw32(dest - 4, cbytes);
716     dest += cbytes;
717     ntbytes += cbytes;
718     ctbytes += cbytes;
719   }  /* Closes j < nsplits */
720 
721   return ctbytes;
722 }
723 
724 /* Decompress & unshuffle a single block */
blosc_d(struct blosc_context * context,int32_t blocksize,int32_t leftoverblock,const uint8_t * base_src,int32_t src_offset,uint8_t * dest,uint8_t * tmp,uint8_t * tmp2)725 static int blosc_d(struct blosc_context* context, int32_t blocksize,
726                    int32_t leftoverblock, const uint8_t* base_src,
727                    int32_t src_offset, uint8_t* dest, uint8_t* tmp,
728                    uint8_t* tmp2) {
729   int8_t header_flags = *(context->header_flags);
730   int dont_split = (header_flags & 0x10) >> 4;
731   int32_t j, neblock, nsplits;
732   int32_t nbytes;                /* number of decompressed bytes in split */
733   const int32_t compressedsize = context->compressedsize;
734   int32_t cbytes;                /* number of compressed bytes in split */
735   int32_t ctbytes = 0;           /* number of compressed bytes in block */
736   int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
737   uint8_t *_tmp = dest;
738   int32_t typesize = context->typesize;
739   int bscount;
740   int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
741   int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
742                       (blocksize >= typesize));
743   const uint8_t* src;
744 
745   if (doshuffle || dobitshuffle) {
746     _tmp = tmp;
747   }
748 
749   /* The number of splits for this block */
750   if (!dont_split &&
751       /* For compatibility with before the introduction of the split flag */
752       ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE) &&
753       !leftoverblock) {
754     nsplits = typesize;
755   }
756   else {
757     nsplits = 1;
758   }
759 
760   neblock = blocksize / nsplits;
761   for (j = 0; j < nsplits; j++) {
762     /* Validate src_offset */
763     if (src_offset < 0 || src_offset > compressedsize - sizeof(int32_t)) {
764       return -1;
765     }
766     cbytes = sw32_(base_src + src_offset); /* amount of compressed bytes */
767     src_offset += sizeof(int32_t);
768     /* Validate cbytes */
769     if (cbytes < 0 || cbytes > context->compressedsize - src_offset) {
770       return -1;
771     }
772     ctbytes += (int32_t)sizeof(int32_t);
773     src = base_src + src_offset;
774     /* Uncompress */
775     if (cbytes == neblock) {
776       fastcopy(_tmp, src, neblock);
777       nbytes = neblock;
778     }
779     else {
780       nbytes = context->decompress_func(src, cbytes, _tmp, neblock);
781       /* Check that decompressed bytes number is correct */
782       if (nbytes != neblock) {
783         return -2;
784       }
785     }
786     src_offset += cbytes;
787     ctbytes += cbytes;
788     _tmp += nbytes;
789     ntbytes += nbytes;
790   } /* Closes j < nsplits */
791 
792   if (doshuffle) {
793     blosc_internal_unshuffle(typesize, blocksize, tmp, dest);
794   }
795   else if (dobitshuffle) {
796     bscount = blosc_internal_bitunshuffle(typesize, blocksize, tmp, dest, tmp2);
797     if (bscount < 0)
798       return bscount;
799   }
800 
801   /* Return the number of uncompressed bytes */
802   return ntbytes;
803 }
804 
805 /* Serial version for compression/decompression */
serial_blosc(struct blosc_context * context)806 static int serial_blosc(struct blosc_context* context)
807 {
808   int32_t j, bsize, leftoverblock;
809   int32_t cbytes;
810 
811   int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
812   int32_t ntbytes = context->num_output_bytes;
813 
814   uint8_t *tmp = my_malloc(context->blocksize + ebsize);
815   uint8_t *tmp2 = tmp + context->blocksize;
816 
817   for (j = 0; j < context->nblocks; j++) {
818     if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) {
819       _sw32(context->bstarts + j * 4, ntbytes);
820     }
821     bsize = context->blocksize;
822     leftoverblock = 0;
823     if ((j == context->nblocks - 1) && (context->leftover > 0)) {
824       bsize = context->leftover;
825       leftoverblock = 1;
826     }
827     if (context->compress) {
828       if (*(context->header_flags) & BLOSC_MEMCPYED) {
829         /* We want to memcpy only */
830         fastcopy(context->dest + BLOSC_MAX_OVERHEAD + j * context->blocksize,
831                  context->src + j * context->blocksize, bsize);
832         cbytes = bsize;
833       }
834       else {
835         /* Regular compression */
836         cbytes = blosc_c(context, bsize, leftoverblock, ntbytes,
837                          context->destsize, context->src+j*context->blocksize,
838                          context->dest+ntbytes, tmp, tmp2);
839         if (cbytes == 0) {
840           ntbytes = 0;              /* uncompressible data */
841           break;
842         }
843       }
844     }
845     else {
846       if (*(context->header_flags) & BLOSC_MEMCPYED) {
847         /* We want to memcpy only */
848         fastcopy(context->dest + j * context->blocksize,
849                  context->src + BLOSC_MAX_OVERHEAD + j * context->blocksize, bsize);
850         cbytes = bsize;
851       }
852       else {
853         /* Regular decompression */
854         cbytes = blosc_d(context, bsize, leftoverblock, context->src,
855                          sw32_(context->bstarts + j * 4),
856                          context->dest + j * context->blocksize, tmp, tmp2);
857       }
858     }
859     if (cbytes < 0) {
860       ntbytes = cbytes;         /* error in blosc_c or blosc_d */
861       break;
862     }
863     ntbytes += cbytes;
864   }
865 
866   /* Free temporaries */
867   my_free(tmp);
868 
869   return ntbytes;
870 }
871 
872 
873 /* Threaded version for compression/decompression */
parallel_blosc(struct blosc_context * context)874 static int parallel_blosc(struct blosc_context* context)
875 {
876   int rc;
877   (void)rc;  // just to avoid 'unused-variable' warning
878 
879   /* Check whether we need to restart threads */
880   if (blosc_set_nthreads_(context) < 0) {
881     return -1;
882   }
883 
884   /* Set sentinels */
885   context->thread_giveup_code = 1;
886   context->thread_nblock = -1;
887 
888   /* Synchronization point for all threads (wait for initialization) */
889   WAIT_INIT(-1, context);
890 
891   /* Synchronization point for all threads (wait for finalization) */
892   WAIT_FINISH(-1, context);
893 
894   if (context->thread_giveup_code > 0) {
895     /* Return the total bytes (de-)compressed in threads */
896     return context->num_output_bytes;
897   }
898   else {
899     /* Compression/decompression gave up.  Return error code. */
900     return context->thread_giveup_code;
901   }
902 }
903 
904 
905 /* Do the compression or decompression of the buffer depending on the
906    global params. */
do_job(struct blosc_context * context)907 static int do_job(struct blosc_context* context)
908 {
909   int32_t ntbytes;
910 
911   /* Run the serial version when nthreads is 1 or when the buffers are
912      not much larger than blocksize */
913   if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
914     ntbytes = serial_blosc(context);
915   }
916   else {
917     ntbytes = parallel_blosc(context);
918   }
919 
920   return ntbytes;
921 }
922 
923 
924 /* Whether a codec is meant for High Compression Ratios */
925 #define HCR(codec) (  \
926              ((codec) == BLOSC_LZ4HC) ||                  \
927              ((codec) == BLOSC_ZLIB) ||                   \
928              ((codec) == BLOSC_ZSTD) ? 1 : 0 )
929 
930 
931 /* Conditions for splitting a block before compressing with a codec. */
split_block(int compcode,int typesize,int blocksize)932 static int split_block(int compcode, int typesize, int blocksize) {
933   int splitblock = -1;
934 
935   switch (g_splitmode) {
936     case BLOSC_ALWAYS_SPLIT:
937       splitblock = 1;
938       break;
939     case BLOSC_NEVER_SPLIT:
940       splitblock = 0;
941       break;
942     case BLOSC_AUTO_SPLIT:
943       /* Normally all the compressors designed for speed benefit from a
944          split.  However, in conducted benchmarks LZ4 seems that it runs
945          faster if we don't split, which is quite surprising. */
946       splitblock= (((compcode == BLOSC_BLOSCLZ) ||
947                     (compcode == BLOSC_SNAPPY)) &&
948                    (typesize <= MAX_SPLITS) &&
949                    (blocksize / typesize) >= MIN_BUFFERSIZE);
950       break;
951     case BLOSC_FORWARD_COMPAT_SPLIT:
952       /* The zstd support was introduced at the same time than the split flag, so
953        * there should be not a problem with not splitting bloscks with it */
954       splitblock = ((compcode != BLOSC_ZSTD) &&
955                     (typesize <= MAX_SPLITS) &&
956                     (blocksize / typesize) >= MIN_BUFFERSIZE);
957       break;
958     default:
959       fprintf(stderr, "Split mode %d not supported", g_splitmode);
960   }
961   return splitblock;
962 }
963 
964 
compute_blocksize(struct blosc_context * context,int32_t clevel,int32_t typesize,int32_t nbytes,int32_t forced_blocksize)965 static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel,
966                                  int32_t typesize, int32_t nbytes,
967                                  int32_t forced_blocksize)
968 {
969   int32_t blocksize;
970 
971   /* Protection against very small buffers */
972   if (nbytes < (int32_t)typesize) {
973     return 1;
974   }
975 
976   blocksize = nbytes;           /* Start by a whole buffer as blocksize */
977 
978   if (forced_blocksize) {
979     blocksize = forced_blocksize;
980     /* Check that forced blocksize is not too small */
981     if (blocksize < MIN_BUFFERSIZE) {
982       blocksize = MIN_BUFFERSIZE;
983     }
984     /* Check that forced blocksize is not too large */
985     if (blocksize > BLOSC_MAX_BLOCKSIZE) {
986       blocksize = BLOSC_MAX_BLOCKSIZE;
987     }
988   }
989   else if (nbytes >= L1) {
990     blocksize = L1;
991 
992     /* For HCR codecs, increase the block sizes by a factor of 2 because they
993        are meant for compressing large blocks (i.e. they show a big overhead
994        when compressing small ones). */
995     if (HCR(context->compcode)) {
996       blocksize *= 2;
997     }
998 
999     switch (clevel) {
1000       case 0:
1001         /* Case of plain copy */
1002         blocksize /= 4;
1003         break;
1004       case 1:
1005         blocksize /= 2;
1006         break;
1007       case 2:
1008         blocksize *= 1;
1009         break;
1010       case 3:
1011         blocksize *= 2;
1012         break;
1013       case 4:
1014       case 5:
1015         blocksize *= 4;
1016         break;
1017       case 6:
1018       case 7:
1019       case 8:
1020         blocksize *= 8;
1021         break;
1022       case 9:
1023         blocksize *= 8;
1024         if (HCR(context->compcode)) {
1025           blocksize *= 2;
1026         }
1027         break;
1028       default:
1029         assert(0);
1030         break;
1031     }
1032   }
1033 
1034   /* Enlarge the blocksize for splittable codecs */
1035   if (clevel > 0 && split_block(context->compcode, typesize, blocksize)) {
1036     if (blocksize > (1 << 18)) {
1037       /* Do not use a too large split buffer (> 256 KB) for splitting codecs */
1038       blocksize = (1 << 18);
1039     }
1040     blocksize *= typesize;
1041     if (blocksize < (1 << 16)) {
1042       /* Do not use a too small blocksize (< 64 KB) when typesize is small */
1043       blocksize = (1 << 16);
1044     }
1045     if (blocksize > 1024 * 1024) {
1046       /* But do not exceed 1 MB per thread (having this capacity in L3 is normal in modern CPUs) */
1047       blocksize = 1024 * 1024;
1048     }
1049 
1050   }
1051 
1052   /* Check that blocksize is not too large */
1053   if (blocksize > (int32_t)nbytes) {
1054     blocksize = nbytes;
1055   }
1056 
1057   /* blocksize *must absolutely* be a multiple of the typesize */
1058   if (blocksize > typesize) {
1059     blocksize = blocksize / typesize * typesize;
1060   }
1061 
1062   return blocksize;
1063 }
1064 
initialize_context_compression(struct blosc_context * context,int clevel,int doshuffle,size_t typesize,size_t sourcesize,const void * src,void * dest,size_t destsize,int32_t compressor,int32_t blocksize,int32_t numthreads)1065 static int initialize_context_compression(struct blosc_context* context,
1066                           int clevel,
1067                           int doshuffle,
1068                           size_t typesize,
1069                           size_t sourcesize,
1070                           const void* src,
1071                           void* dest,
1072                           size_t destsize,
1073                           int32_t compressor,
1074                           int32_t blocksize,
1075                           int32_t numthreads)
1076 {
1077   char *envvar = NULL;
1078   int warnlvl = 0;
1079   /* Set parameters */
1080   context->compress = 1;
1081   context->src = (const uint8_t*)src;
1082   context->dest = (uint8_t *)(dest);
1083   context->num_output_bytes = 0;
1084   context->destsize = (int32_t)destsize;
1085   context->sourcesize = sourcesize;
1086   context->typesize = typesize;
1087   context->compcode = compressor;
1088   context->numthreads = numthreads;
1089   context->end_threads = 0;
1090   context->clevel = clevel;
1091 
1092   envvar = getenv("BLOSC_WARN");
1093   if (envvar != NULL) {
1094     warnlvl = strtol(envvar, NULL, 10);
1095   }
1096 
1097   /* Check buffer size limits */
1098   if (sourcesize > BLOSC_MAX_BUFFERSIZE) {
1099     if (warnlvl > 0) {
1100       fprintf(stderr, "Input buffer size cannot exceed %d bytes\n",
1101               BLOSC_MAX_BUFFERSIZE);
1102     }
1103     return 0;
1104   }
1105   if (destsize < BLOSC_MAX_OVERHEAD) {
1106     if (warnlvl > 0) {
1107       fprintf(stderr, "Output buffer size should be larger than %d bytes\n",
1108               BLOSC_MAX_OVERHEAD);
1109     }
1110     return 0;
1111   }
1112 
1113   /* Compression level */
1114   if (clevel < 0 || clevel > 9) {
1115     fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
1116     return -10;
1117   }
1118 
1119   /* Shuffle */
1120   if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) {
1121     fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n");
1122     return -10;
1123   }
1124 
1125   /* Check typesize limits */
1126   if (context->typesize > BLOSC_MAX_TYPESIZE) {
1127     /* If typesize is too large, treat buffer as an 1-byte stream. */
1128     context->typesize = 1;
1129   }
1130 
1131   /* Get the blocksize */
1132   context->blocksize = compute_blocksize(context, clevel, (int32_t)context->typesize, context->sourcesize, blocksize);
1133 
1134   /* Compute number of blocks in buffer */
1135   context->nblocks = context->sourcesize / context->blocksize;
1136   context->leftover = context->sourcesize % context->blocksize;
1137   context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks;
1138 
1139   return 1;
1140 }
1141 
1142 
write_compression_header(struct blosc_context * context,int clevel,int doshuffle)1143 static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle)
1144 {
1145   int32_t compformat;
1146   int dont_split;
1147 
1148   /* Write version header for this block */
1149   context->dest[0] = BLOSC_VERSION_FORMAT;           /* blosc format version */
1150 
1151   /* Write compressor format */
1152   compformat = -1;
1153   switch (context->compcode)
1154   {
1155   case BLOSC_BLOSCLZ:
1156     compformat = BLOSC_BLOSCLZ_FORMAT;
1157     context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */
1158     break;
1159 
1160 #if defined(HAVE_LZ4)
1161   case BLOSC_LZ4:
1162     compformat = BLOSC_LZ4_FORMAT;
1163     context->dest[1] = BLOSC_LZ4_VERSION_FORMAT;  /* lz4 format version */
1164     break;
1165   case BLOSC_LZ4HC:
1166     compformat = BLOSC_LZ4HC_FORMAT;
1167     context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */
1168     break;
1169 #endif /* HAVE_LZ4 */
1170 
1171 #if defined(HAVE_SNAPPY)
1172   case BLOSC_SNAPPY:
1173     compformat = BLOSC_SNAPPY_FORMAT;
1174     context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT;    /* snappy format version */
1175     break;
1176 #endif /* HAVE_SNAPPY */
1177 
1178 #if defined(HAVE_ZLIB)
1179   case BLOSC_ZLIB:
1180     compformat = BLOSC_ZLIB_FORMAT;
1181     context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT;      /* zlib format version */
1182     break;
1183 #endif /* HAVE_ZLIB */
1184 
1185 #if defined(HAVE_ZSTD)
1186   case BLOSC_ZSTD:
1187     compformat = BLOSC_ZSTD_FORMAT;
1188     context->dest[1] = BLOSC_ZSTD_VERSION_FORMAT;      /* zstd format version */
1189     break;
1190 #endif /* HAVE_ZSTD */
1191 
1192   default:
1193   {
1194     const char *compname;
1195     compname = clibcode_to_clibname(compformat);
1196     if (compname == NULL) {
1197         compname = "(null)";
1198     }
1199     fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
1200     fprintf(stderr, "compression support.  Please use one having it.");
1201     return -5;    /* signals no compression support */
1202     break;
1203   }
1204   }
1205 
1206   context->header_flags = context->dest+2;  /* flags */
1207   context->dest[2] = 0;  /* zeroes flags */
1208   context->dest[3] = (uint8_t)context->typesize;  /* type size */
1209   _sw32(context->dest + 4, context->sourcesize);  /* size of the buffer */
1210   _sw32(context->dest + 8, context->blocksize);  /* block size */
1211   context->bstarts = context->dest + 16;  /* starts for every block */
1212   context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks;  /* space for header and pointers */
1213 
1214   if (context->clevel == 0) {
1215     /* Compression level 0 means buffer to be memcpy'ed */
1216     *(context->header_flags) |= BLOSC_MEMCPYED;
1217     context->num_output_bytes = 16;      /* space just for header */
1218   }
1219 
1220   if (context->sourcesize < MIN_BUFFERSIZE) {
1221     /* Buffer is too small.  Try memcpy'ing. */
1222     *(context->header_flags) |= BLOSC_MEMCPYED;
1223     context->num_output_bytes = 16;      /* space just for header */
1224   }
1225 
1226   if (doshuffle == BLOSC_SHUFFLE) {
1227     /* Byte-shuffle is active */
1228     *(context->header_flags) |= BLOSC_DOSHUFFLE;     /* bit 0 set to one in flags */
1229   }
1230 
1231   if (doshuffle == BLOSC_BITSHUFFLE) {
1232     /* Bit-shuffle is active */
1233     *(context->header_flags) |= BLOSC_DOBITSHUFFLE;  /* bit 2 set to one in flags */
1234   }
1235 
1236   dont_split = !split_block(context->compcode, context->typesize,
1237                             context->blocksize);
1238   *(context->header_flags) |= dont_split << 4;  /* dont_split is in bit 4 */
1239   *(context->header_flags) |= compformat << 5;  /* compressor format starts at bit 5 */
1240 
1241   return 1;
1242 }
1243 
1244 
blosc_compress_context(struct blosc_context * context)1245 int blosc_compress_context(struct blosc_context* context)
1246 {
1247   int32_t ntbytes = 0;
1248 
1249   if ((*(context->header_flags) & BLOSC_MEMCPYED) &&
1250       (context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize)) {
1251     return 0;   /* data cannot be copied without overrun destination */
1252   }
1253 
1254   /* Do the actual compression */
1255   ntbytes = do_job(context);
1256   if (ntbytes < 0) {
1257     return -1;
1258   }
1259   if ((ntbytes == 0) && (context->sourcesize + BLOSC_MAX_OVERHEAD <= context->destsize)) {
1260     /* Last chance for fitting `src` buffer in `dest`.  Update flags and force a copy. */
1261     *(context->header_flags) |= BLOSC_MEMCPYED;
1262     context->num_output_bytes = BLOSC_MAX_OVERHEAD;  /* reset the output bytes in previous step */
1263     ntbytes = do_job(context);
1264     if (ntbytes < 0) {
1265       return -1;
1266     }
1267   }
1268 
1269   /* Set the number of compressed bytes in header */
1270   _sw32(context->dest + 12, ntbytes);
1271 
1272   assert(ntbytes <= context->destsize);
1273   return ntbytes;
1274 }
1275 
1276 /* The public routine for compression with context. */
blosc_compress_ctx(int clevel,int doshuffle,size_t typesize,size_t nbytes,const void * src,void * dest,size_t destsize,const char * compressor,size_t blocksize,int numinternalthreads)1277 int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
1278                        size_t nbytes, const void* src, void* dest,
1279                        size_t destsize, const char* compressor,
1280                        size_t blocksize, int numinternalthreads)
1281 {
1282   int error, result;
1283   struct blosc_context context;
1284 
1285   context.threads_started = 0;
1286   error = initialize_context_compression(&context, clevel, doshuffle, typesize,
1287 					 nbytes, src, dest, destsize,
1288 					 blosc_compname_to_compcode(compressor),
1289 					 blocksize, numinternalthreads);
1290   if (error <= 0) { return error; }
1291 
1292   error = write_compression_header(&context, clevel, doshuffle);
1293   if (error <= 0) { return error; }
1294 
1295   result = blosc_compress_context(&context);
1296 
1297   if (numinternalthreads > 1)
1298   {
1299     blosc_release_threadpool(&context);
1300   }
1301 
1302   return result;
1303 }
1304 
1305 /* The public routine for compression.  See blosc.h for docstrings. */
blosc_compress(int clevel,int doshuffle,size_t typesize,size_t nbytes,const void * src,void * dest,size_t destsize)1306 int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
1307                    const void *src, void *dest, size_t destsize)
1308 {
1309   int result;
1310   char* envvar;
1311 
1312   /* Check if should initialize */
1313   if (!g_initlib) blosc_init();
1314 
1315   /* Check for environment variables */
1316   envvar = getenv("BLOSC_CLEVEL");
1317   if (envvar != NULL) {
1318     long value;
1319     value = strtol(envvar, NULL, 10);
1320     if ((value != EINVAL) && (value >= 0)) {
1321       clevel = (int)value;
1322     }
1323   }
1324 
1325   envvar = getenv("BLOSC_SHUFFLE");
1326   if (envvar != NULL) {
1327     if (strcmp(envvar, "NOSHUFFLE") == 0) {
1328       doshuffle = BLOSC_NOSHUFFLE;
1329     }
1330     if (strcmp(envvar, "SHUFFLE") == 0) {
1331       doshuffle = BLOSC_SHUFFLE;
1332     }
1333     if (strcmp(envvar, "BITSHUFFLE") == 0) {
1334       doshuffle = BLOSC_BITSHUFFLE;
1335     }
1336   }
1337 
1338   envvar = getenv("BLOSC_TYPESIZE");
1339   if (envvar != NULL) {
1340     long value;
1341     value = strtol(envvar, NULL, 10);
1342     if ((value != EINVAL) && (value > 0)) {
1343       typesize = (int)value;
1344     }
1345   }
1346 
1347   envvar = getenv("BLOSC_COMPRESSOR");
1348   if (envvar != NULL) {
1349     result = blosc_set_compressor(envvar);
1350     if (result < 0) { return result; }
1351   }
1352 
1353   envvar = getenv("BLOSC_BLOCKSIZE");
1354   if (envvar != NULL) {
1355     long blocksize;
1356     blocksize = strtol(envvar, NULL, 10);
1357     if ((blocksize != EINVAL) && (blocksize > 0)) {
1358       blosc_set_blocksize((size_t)blocksize);
1359     }
1360   }
1361 
1362   envvar = getenv("BLOSC_NTHREADS");
1363   if (envvar != NULL) {
1364     long nthreads;
1365     nthreads = strtol(envvar, NULL, 10);
1366     if ((nthreads != EINVAL) && (nthreads > 0)) {
1367       result = blosc_set_nthreads((int)nthreads);
1368       if (result < 0) { return result; }
1369     }
1370   }
1371 
1372   envvar = getenv("BLOSC_SPLITMODE");
1373   if (envvar != NULL) {
1374     if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
1375       blosc_set_splitmode(BLOSC_FORWARD_COMPAT_SPLIT);
1376     }
1377     else if (strcmp(envvar, "AUTO") == 0) {
1378       blosc_set_splitmode(BLOSC_AUTO_SPLIT);
1379     }
1380     else if (strcmp(envvar, "ALWAYS") == 0) {
1381       blosc_set_splitmode(BLOSC_ALWAYS_SPLIT);
1382     }
1383     else if (strcmp(envvar, "NEVER") == 0) {
1384       blosc_set_splitmode(BLOSC_NEVER_SPLIT);
1385     }
1386     else {
1387       fprintf(stderr, "BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
1388       return -1;
1389     }
1390   }
1391 
1392   /* Check for a BLOSC_NOLOCK environment variable.  It is important
1393      that this should be the last env var so that it can take the
1394      previous ones into account */
1395   envvar = getenv("BLOSC_NOLOCK");
1396   if (envvar != NULL) {
1397     const char *compname;
1398     blosc_compcode_to_compname(g_compressor, &compname);
1399     result = blosc_compress_ctx(clevel, doshuffle, typesize,
1400 				nbytes, src, dest, destsize,
1401 				compname, g_force_blocksize, g_threads);
1402     return result;
1403   }
1404 
1405   pthread_mutex_lock(global_comp_mutex);
1406 
1407   do {
1408     result = initialize_context_compression(g_global_context, clevel, doshuffle,
1409                                            typesize, nbytes, src, dest, destsize,
1410                                            g_compressor, g_force_blocksize,
1411                                            g_threads);
1412     if (result <= 0) { break; }
1413 
1414     result = write_compression_header(g_global_context, clevel, doshuffle);
1415     if (result <= 0) { break; }
1416 
1417     result = blosc_compress_context(g_global_context);
1418   } while (0);
1419 
1420   pthread_mutex_unlock(global_comp_mutex);
1421 
1422   return result;
1423 }
1424 
blosc_run_decompression_with_context(struct blosc_context * context,const void * src,void * dest,size_t destsize,int numinternalthreads)1425 static int blosc_run_decompression_with_context(struct blosc_context* context,
1426                                                 const void* src,
1427                                                 void* dest,
1428                                                 size_t destsize,
1429                                                 int numinternalthreads)
1430 {
1431   uint8_t version;
1432   int32_t ntbytes;
1433 
1434   context->compress = 0;
1435   context->src = (const uint8_t*)src;
1436   context->dest = (uint8_t*)dest;
1437   context->destsize = destsize;
1438   context->num_output_bytes = 0;
1439   context->numthreads = numinternalthreads;
1440   context->end_threads = 0;
1441 
1442   /* Read the header block */
1443   version = context->src[0];                        /* blosc format version */
1444   context->compversion = context->src[1];
1445 
1446   context->header_flags = (uint8_t*)(context->src + 2);           /* flags */
1447   context->typesize = (int32_t)context->src[3];      /* typesize */
1448   context->sourcesize = sw32_(context->src + 4);     /* buffer size */
1449   context->blocksize = sw32_(context->src + 8);      /* block size */
1450   context->compressedsize = sw32_(context->src + 12); /* compressed buffer size */
1451   context->bstarts = (uint8_t*)(context->src + 16);
1452 
1453   if (context->sourcesize == 0) {
1454     /* Source buffer was empty, so we are done */
1455     return 0;
1456   }
1457 
1458   if (context->blocksize <= 0 || context->blocksize > destsize ||
1459       context->blocksize > BLOSC_MAX_BLOCKSIZE || context->typesize <= 0 ||
1460       context->typesize > BLOSC_MAX_TYPESIZE) {
1461     return -1;
1462   }
1463 
1464   if (version != BLOSC_VERSION_FORMAT) {
1465     /* Version from future */
1466     return -1;
1467   }
1468   if (*context->header_flags & 0x08) {
1469     /* compressor flags from the future */
1470     return -1;
1471   }
1472 
1473   /* Compute some params */
1474   /* Total blocks */
1475   context->nblocks = context->sourcesize / context->blocksize;
1476   context->leftover = context->sourcesize % context->blocksize;
1477   context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks;
1478 
1479   /* Check that we have enough space to decompress */
1480   if (context->sourcesize > (int32_t)destsize) {
1481     return -1;
1482   }
1483 
1484   if (*(context->header_flags) & BLOSC_MEMCPYED) {
1485     /* Validate that compressed size is equal to decompressed size + header
1486        size. */
1487     if (context->sourcesize + BLOSC_MAX_OVERHEAD != context->compressedsize) {
1488       return -1;
1489     }
1490   } else {
1491     ntbytes = initialize_decompress_func(context);
1492     if (ntbytes != 0) return ntbytes;
1493 
1494     /* Validate that compressed size is large enough to hold the bstarts array */
1495     if (context->nblocks > (context->compressedsize - 16) / 4) {
1496       return -1;
1497     }
1498   }
1499 
1500   /* Do the actual decompression */
1501   ntbytes = do_job(context);
1502   if (ntbytes < 0) {
1503     return -1;
1504   }
1505 
1506   assert(ntbytes <= (int32_t)destsize);
1507   return ntbytes;
1508 }
1509 
blosc_decompress_ctx(const void * src,void * dest,size_t destsize,int numinternalthreads)1510 int blosc_decompress_ctx(const void* src, void* dest, size_t destsize,
1511                          int numinternalthreads) {
1512   int result;
1513   struct blosc_context context;
1514 
1515   context.threads_started = 0;
1516   result = blosc_run_decompression_with_context(&context, src, dest, destsize,
1517                                                 numinternalthreads);
1518 
1519   if (numinternalthreads > 1)
1520   {
1521     blosc_release_threadpool(&context);
1522   }
1523 
1524   return result;
1525 }
1526 
blosc_decompress(const void * src,void * dest,size_t destsize)1527 int blosc_decompress(const void* src, void* dest, size_t destsize) {
1528   int result;
1529   char* envvar;
1530   long nthreads;
1531 
1532   /* Check if should initialize */
1533   if (!g_initlib) blosc_init();
1534 
1535   /* Check for a BLOSC_NTHREADS environment variable */
1536   envvar = getenv("BLOSC_NTHREADS");
1537   if (envvar != NULL) {
1538     nthreads = strtol(envvar, NULL, 10);
1539     if ((nthreads != EINVAL) && (nthreads > 0)) {
1540       result = blosc_set_nthreads((int)nthreads);
1541       if (result < 0) { return result; }
1542     }
1543   }
1544 
1545   /* Check for a BLOSC_NOLOCK environment variable.  It is important
1546      that this should be the last env var so that it can take the
1547      previous ones into account */
1548   envvar = getenv("BLOSC_NOLOCK");
1549   if (envvar != NULL) {
1550     result = blosc_decompress_ctx(src, dest, destsize, g_threads);
1551     return result;
1552   }
1553 
1554   pthread_mutex_lock(global_comp_mutex);
1555 
1556   result = blosc_run_decompression_with_context(g_global_context, src, dest,
1557                                                 destsize, g_threads);
1558 
1559   pthread_mutex_unlock(global_comp_mutex);
1560 
1561   return result;
1562 }
1563 
blosc_getitem(const void * src,int start,int nitems,void * dest)1564 int blosc_getitem(const void* src, int start, int nitems, void* dest) {
1565   uint8_t *_src=NULL;               /* current pos for source buffer */
1566   uint8_t version, compversion;     /* versions for compressed header */
1567   uint8_t flags;                    /* flags for header */
1568   int32_t ntbytes = 0;              /* the number of uncompressed bytes */
1569   int32_t nblocks;                  /* number of total blocks in buffer */
1570   int32_t leftover;                 /* extra bytes at end of buffer */
1571   uint8_t *bstarts;                 /* start pointers for each block */
1572   int32_t typesize, blocksize, nbytes, compressedsize;
1573   int32_t j, bsize, bsize2, leftoverblock;
1574   int32_t cbytes, startb, stopb;
1575   int stop = start + nitems;
1576   uint8_t *tmp;
1577   uint8_t *tmp2;
1578   uint8_t *tmp3;
1579   int32_t ebsize;
1580   struct blosc_context context = {0};
1581 
1582   _src = (uint8_t *)(src);
1583 
1584   /* Read the header block */
1585   version = _src[0];                        /* blosc format version */
1586   compversion = _src[1];
1587   flags = _src[2];                          /* flags */
1588   typesize = (int32_t)_src[3];              /* typesize */
1589   nbytes = sw32_(_src + 4);                 /* buffer size */
1590   blocksize = sw32_(_src + 8);              /* block size */
1591   compressedsize = sw32_(_src + 12); /* compressed buffer size */
1592 
1593   if (version != BLOSC_VERSION_FORMAT)
1594     return -9;
1595 
1596   if (blocksize <= 0 || blocksize > nbytes || blocksize > BLOSC_MAX_BLOCKSIZE ||
1597       typesize <= 0 || typesize > BLOSC_MAX_TYPESIZE) {
1598     return -1;
1599   }
1600 
1601   /* Compute some params */
1602   /* Total blocks */
1603   nblocks = nbytes / blocksize;
1604   leftover = nbytes % blocksize;
1605   nblocks = (leftover>0)? nblocks+1: nblocks;
1606 
1607   /* Only initialize the fields blosc_d uses */
1608   context.typesize = typesize;
1609   context.header_flags = &flags;
1610   context.compversion = compversion;
1611   context.compressedsize = compressedsize;
1612   if (flags & BLOSC_MEMCPYED) {
1613     if (nbytes + BLOSC_MAX_OVERHEAD != compressedsize) {
1614       return -1;
1615     }
1616   } else {
1617     ntbytes = initialize_decompress_func(&context);
1618     if (ntbytes != 0) return ntbytes;
1619 
1620     if (nblocks >= (compressedsize - 16) / 4) {
1621       return -1;
1622     }
1623   }
1624 
1625   ebsize = blocksize + typesize * (int32_t)sizeof(int32_t);
1626   tmp = my_malloc(blocksize + ebsize + blocksize);
1627   tmp2 = tmp + blocksize;
1628   tmp3 = tmp + blocksize + ebsize;
1629 
1630   _src += 16;
1631   bstarts = _src;
1632   _src += sizeof(int32_t)*nblocks;
1633 
1634   /* Check region boundaries */
1635   if ((start < 0) || (start*typesize > nbytes)) {
1636     fprintf(stderr, "`start` out of bounds");
1637     return -1;
1638   }
1639 
1640   if ((stop < 0) || (stop*typesize > nbytes)) {
1641     fprintf(stderr, "`start`+`nitems` out of bounds");
1642     return -1;
1643   }
1644 
1645   for (j = 0; j < nblocks; j++) {
1646     bsize = blocksize;
1647     leftoverblock = 0;
1648     if ((j == nblocks - 1) && (leftover > 0)) {
1649       bsize = leftover;
1650       leftoverblock = 1;
1651     }
1652 
1653     /* Compute start & stop for each block */
1654     startb = start * typesize - j * blocksize;
1655     stopb = stop * typesize - j * blocksize;
1656     if ((startb >= (int)blocksize) || (stopb <= 0)) {
1657       continue;
1658     }
1659     if (startb < 0) {
1660       startb = 0;
1661     }
1662     if (stopb > (int)blocksize) {
1663       stopb = blocksize;
1664     }
1665     bsize2 = stopb - startb;
1666 
1667     /* Do the actual data copy */
1668     if (flags & BLOSC_MEMCPYED) {
1669       /* We want to memcpy only */
1670       fastcopy((uint8_t *) dest + ntbytes,
1671                (uint8_t *) src + BLOSC_MAX_OVERHEAD + j * blocksize + startb, bsize2);
1672       cbytes = bsize2;
1673     }
1674     else {
1675       /* Regular decompression.  Put results in tmp2. */
1676       cbytes = blosc_d(&context, bsize, leftoverblock,
1677                        (uint8_t *)src, sw32_(bstarts + j * 4),
1678                        tmp2, tmp, tmp3);
1679       if (cbytes < 0) {
1680         ntbytes = cbytes;
1681         break;
1682       }
1683       /* Copy to destination */
1684       fastcopy((uint8_t *) dest + ntbytes, tmp2 + startb, bsize2);
1685       cbytes = bsize2;
1686     }
1687     ntbytes += cbytes;
1688   }
1689 
1690   my_free(tmp);
1691 
1692   return ntbytes;
1693 }
1694 
1695 /* Decompress & unshuffle several blocks in a single thread */
t_blosc(void * ctxt)1696 static void *t_blosc(void *ctxt)
1697 {
1698   struct thread_context* context = (struct thread_context*)ctxt;
1699   int32_t cbytes, ntdest;
1700   int32_t tblocks;              /* number of blocks per thread */
1701   int32_t leftover2;
1702   int32_t tblock;               /* limit block on a thread */
1703   int32_t nblock_;              /* private copy of nblock */
1704   int32_t bsize, leftoverblock;
1705   /* Parameters for threads */
1706   int32_t blocksize;
1707   int32_t ebsize;
1708   int32_t compress;
1709   int32_t maxbytes;
1710   int32_t ntbytes;
1711   int32_t flags;
1712   int32_t nblocks;
1713   int32_t leftover;
1714   uint8_t *bstarts;
1715   const uint8_t *src;
1716   uint8_t *dest;
1717   uint8_t *tmp;
1718   uint8_t *tmp2;
1719   uint8_t *tmp3;
1720   int rc;
1721   (void)rc;  // just to avoid 'unused-variable' warning
1722 
1723   while(1)
1724   {
1725     /* Synchronization point for all threads (wait for initialization) */
1726     WAIT_INIT(NULL, context->parent_context);
1727 
1728     if(context->parent_context->end_threads)
1729     {
1730       break;
1731     }
1732 
1733     /* Get parameters for this thread before entering the main loop */
1734     blocksize = context->parent_context->blocksize;
1735     ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t);
1736     compress = context->parent_context->compress;
1737     flags = *(context->parent_context->header_flags);
1738     maxbytes = context->parent_context->destsize;
1739     nblocks = context->parent_context->nblocks;
1740     leftover = context->parent_context->leftover;
1741     bstarts = context->parent_context->bstarts;
1742     src = context->parent_context->src;
1743     dest = context->parent_context->dest;
1744 
1745     if (blocksize > context->tmpblocksize)
1746     {
1747       my_free(context->tmp);
1748       context->tmp = my_malloc(blocksize + ebsize + blocksize);
1749       context->tmp2 = context->tmp + blocksize;
1750       context->tmp3 = context->tmp + blocksize + ebsize;
1751     }
1752 
1753     tmp = context->tmp;
1754     tmp2 = context->tmp2;
1755     tmp3 = context->tmp3;
1756 
1757     ntbytes = 0;                /* only useful for decompression */
1758 
1759     if (compress && !(flags & BLOSC_MEMCPYED)) {
1760       /* Compression always has to follow the block order */
1761       pthread_mutex_lock(&context->parent_context->count_mutex);
1762       context->parent_context->thread_nblock++;
1763       nblock_ = context->parent_context->thread_nblock;
1764       pthread_mutex_unlock(&context->parent_context->count_mutex);
1765       tblock = nblocks;
1766     }
1767     else {
1768       /* Decompression can happen using any order.  We choose
1769        sequential block order on each thread */
1770 
1771       /* Blocks per thread */
1772       tblocks = nblocks / context->parent_context->numthreads;
1773       leftover2 = nblocks % context->parent_context->numthreads;
1774       tblocks = (leftover2>0)? tblocks+1: tblocks;
1775 
1776       nblock_ = context->tid*tblocks;
1777       tblock = nblock_ + tblocks;
1778       if (tblock > nblocks) {
1779         tblock = nblocks;
1780       }
1781     }
1782 
1783     /* Loop over blocks */
1784     leftoverblock = 0;
1785     while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) {
1786       bsize = blocksize;
1787       if (nblock_ == (nblocks - 1) && (leftover > 0)) {
1788         bsize = leftover;
1789         leftoverblock = 1;
1790       }
1791       if (compress) {
1792         if (flags & BLOSC_MEMCPYED) {
1793           /* We want to memcpy only */
1794           fastcopy(dest + BLOSC_MAX_OVERHEAD + nblock_ * blocksize,
1795                    src + nblock_ * blocksize, bsize);
1796           cbytes = bsize;
1797         }
1798         else {
1799           /* Regular compression */
1800           cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize,
1801                            src+nblock_*blocksize, tmp2, tmp, tmp3);
1802         }
1803       }
1804       else {
1805         if (flags & BLOSC_MEMCPYED) {
1806           /* We want to memcpy only */
1807           fastcopy(dest + nblock_ * blocksize,
1808                    src + BLOSC_MAX_OVERHEAD + nblock_ * blocksize, bsize);
1809           cbytes = bsize;
1810         }
1811         else {
1812           cbytes = blosc_d(context->parent_context, bsize, leftoverblock,
1813                            src, sw32_(bstarts + nblock_ * 4),
1814                            dest+nblock_*blocksize,
1815                            tmp, tmp2);
1816         }
1817       }
1818 
1819       /* Check whether current thread has to giveup */
1820       if (context->parent_context->thread_giveup_code <= 0) {
1821         break;
1822       }
1823 
1824       /* Check results for the compressed/decompressed block */
1825       if (cbytes < 0) {            /* compr/decompr failure */
1826         /* Set giveup_code error */
1827         pthread_mutex_lock(&context->parent_context->count_mutex);
1828         context->parent_context->thread_giveup_code = cbytes;
1829         pthread_mutex_unlock(&context->parent_context->count_mutex);
1830         break;
1831       }
1832 
1833       if (compress && !(flags & BLOSC_MEMCPYED)) {
1834         /* Start critical section */
1835         pthread_mutex_lock(&context->parent_context->count_mutex);
1836         ntdest = context->parent_context->num_output_bytes;
1837         _sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */
1838         if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) {
1839           context->parent_context->thread_giveup_code = 0;  /* uncompressible buffer */
1840           pthread_mutex_unlock(&context->parent_context->count_mutex);
1841           break;
1842         }
1843         context->parent_context->thread_nblock++;
1844         nblock_ = context->parent_context->thread_nblock;
1845         context->parent_context->num_output_bytes += cbytes;           /* update return bytes counter */
1846         pthread_mutex_unlock(&context->parent_context->count_mutex);
1847         /* End of critical section */
1848 
1849         /* Copy the compressed buffer to destination */
1850         fastcopy(dest + ntdest, tmp2, cbytes);
1851       }
1852       else {
1853         nblock_++;
1854         /* Update counter for this thread */
1855         ntbytes += cbytes;
1856       }
1857 
1858     } /* closes while (nblock_) */
1859 
1860     /* Sum up all the bytes decompressed */
1861     if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) {
1862       /* Update global counter for all threads (decompression only) */
1863       pthread_mutex_lock(&context->parent_context->count_mutex);
1864       context->parent_context->num_output_bytes += ntbytes;
1865       pthread_mutex_unlock(&context->parent_context->count_mutex);
1866     }
1867 
1868     /* Meeting point for all threads (wait for finalization) */
1869     WAIT_FINISH(NULL, context->parent_context);
1870   }
1871 
1872   /* Cleanup our working space and context */
1873   my_free(context->tmp);
1874   my_free(context);
1875 
1876   return(NULL);
1877 }
1878 
1879 
init_threads(struct blosc_context * context)1880 static int init_threads(struct blosc_context* context)
1881 {
1882   int32_t tid;
1883   int rc2;
1884   int32_t ebsize;
1885   struct thread_context* thread_context;
1886 
1887   /* Initialize mutex and condition variable objects */
1888   pthread_mutex_init(&context->count_mutex, NULL);
1889 
1890   /* Set context thread sentinels */
1891   context->thread_giveup_code = 1;
1892   context->thread_nblock = -1;
1893 
1894   /* Barrier initialization */
1895 #ifdef _POSIX_BARRIERS_MINE
1896   pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1);
1897   pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1);
1898 #else
1899   pthread_mutex_init(&context->count_threads_mutex, NULL);
1900   pthread_cond_init(&context->count_threads_cv, NULL);
1901   context->count_threads = 0;      /* Reset threads counter */
1902 #endif
1903 
1904 #if !defined(_WIN32)
1905   /* Initialize and set thread detached attribute */
1906   pthread_attr_init(&context->ct_attr);
1907   pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
1908 #endif
1909 
1910   /* Finally, create the threads in detached state */
1911   for (tid = 0; tid < context->numthreads; tid++) {
1912     context->tids[tid] = tid;
1913 
1914     /* Create a thread context thread owns context (will destroy when finished) */
1915     thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
1916     thread_context->parent_context = context;
1917     thread_context->tid = tid;
1918 
1919     ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
1920     thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize);
1921     thread_context->tmp2 = thread_context->tmp + context->blocksize;
1922     thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize;
1923     thread_context->tmpblocksize = context->blocksize;
1924 
1925 #if !defined(_WIN32)
1926     rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context);
1927 #else
1928     rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context);
1929 #endif
1930     if (rc2) {
1931       fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc2);
1932       fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
1933       return(-1);
1934     }
1935   }
1936 
1937 
1938   return(0);
1939 }
1940 
blosc_get_nthreads(void)1941 int blosc_get_nthreads(void)
1942 {
1943   int ret = g_threads;
1944 
1945   return ret;
1946 }
1947 
blosc_set_nthreads(int nthreads_new)1948 int blosc_set_nthreads(int nthreads_new)
1949 {
1950   int ret = g_threads;
1951 
1952   /* Check if should initialize */
1953   if (!g_initlib) blosc_init();
1954 
1955   if (nthreads_new != ret){
1956     /* Re-initialize Blosc */
1957     blosc_destroy();
1958     blosc_init();
1959     g_threads = nthreads_new;
1960   }
1961 
1962   return ret;
1963 }
1964 
blosc_set_nthreads_(struct blosc_context * context)1965 int blosc_set_nthreads_(struct blosc_context* context)
1966 {
1967   if (context->numthreads > BLOSC_MAX_THREADS) {
1968     fprintf(stderr,
1969             "Error.  nthreads cannot be larger than BLOSC_MAX_THREADS (%d)",
1970             BLOSC_MAX_THREADS);
1971     return -1;
1972   }
1973   else if (context->numthreads <= 0) {
1974     fprintf(stderr, "Error.  nthreads must be a positive integer");
1975     return -1;
1976   }
1977 
1978   /* Launch a new pool of threads */
1979   if (context->numthreads > 1 && context->numthreads != context->threads_started) {
1980     blosc_release_threadpool(context);
1981     if (init_threads(context) < 0) {
1982       return -1;
1983     }
1984   }
1985 
1986   /* We have now started the threads */
1987   context->threads_started = context->numthreads;
1988 
1989   return context->numthreads;
1990 }
1991 
blosc_get_compressor(void)1992 const char* blosc_get_compressor(void)
1993 {
1994   const char* compname;
1995   blosc_compcode_to_compname(g_compressor, &compname);
1996 
1997   return compname;
1998 }
1999 
blosc_set_compressor(const char * compname)2000 int blosc_set_compressor(const char *compname)
2001 {
2002   int code = blosc_compname_to_compcode(compname);
2003 
2004   g_compressor = code;
2005 
2006   /* Check if should initialize */
2007   if (!g_initlib) blosc_init();
2008 
2009   return code;
2010 }
2011 
blosc_list_compressors(void)2012 const char* blosc_list_compressors(void)
2013 {
2014   static int compressors_list_done = 0;
2015   static char ret[256];
2016 
2017   if (compressors_list_done) return ret;
2018   ret[0] = '\0';
2019   strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
2020 #if defined(HAVE_LZ4)
2021   strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME);
2022   strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME);
2023 #endif /* HAVE_LZ4 */
2024 #if defined(HAVE_SNAPPY)
2025   strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME);
2026 #endif /* HAVE_SNAPPY */
2027 #if defined(HAVE_ZLIB)
2028   strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME);
2029 #endif /* HAVE_ZLIB */
2030 #if defined(HAVE_ZSTD)
2031   strcat(ret, ","); strcat(ret, BLOSC_ZSTD_COMPNAME);
2032 #endif /* HAVE_ZSTD */
2033   compressors_list_done = 1;
2034   return ret;
2035 }
2036 
blosc_get_version_string(void)2037 const char* blosc_get_version_string(void)
2038 {
2039   return BLOSC_VERSION_STRING;
2040 }
2041 
blosc_get_complib_info(const char * compname,char ** complib,char ** version)2042 int blosc_get_complib_info(const char *compname, char **complib, char **version)
2043 {
2044   int clibcode;
2045   const char *clibname;
2046   const char *clibversion = "unknown";
2047 
2048 #if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION)) || defined(ZSTD_VERSION_MAJOR)
2049   char sbuffer[256];
2050 #endif
2051 
2052   clibcode = compname_to_clibcode(compname);
2053   clibname = clibcode_to_clibname(clibcode);
2054 
2055   /* complib version */
2056   if (clibcode == BLOSC_BLOSCLZ_LIB) {
2057     clibversion = BLOSCLZ_VERSION_STRING;
2058   }
2059 #if defined(HAVE_LZ4)
2060   else if (clibcode == BLOSC_LZ4_LIB) {
2061 #if defined(LZ4_VERSION_MAJOR)
2062     sprintf(sbuffer, "%d.%d.%d",
2063             LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
2064     clibversion = sbuffer;
2065 #endif /* LZ4_VERSION_MAJOR */
2066   }
2067 #endif /* HAVE_LZ4 */
2068 #if defined(HAVE_SNAPPY)
2069   else if (clibcode == BLOSC_SNAPPY_LIB) {
2070 #if defined(SNAPPY_VERSION)
2071     sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL);
2072     clibversion = sbuffer;
2073 #endif /* SNAPPY_VERSION */
2074   }
2075 #endif /* HAVE_SNAPPY */
2076 #if defined(HAVE_ZLIB)
2077   else if (clibcode == BLOSC_ZLIB_LIB) {
2078     clibversion = ZLIB_VERSION;
2079   }
2080 #endif /* HAVE_ZLIB */
2081 #if defined(HAVE_ZSTD)
2082   else if (clibcode == BLOSC_ZSTD_LIB) {
2083     sprintf(sbuffer, "%d.%d.%d",
2084             ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
2085     clibversion = sbuffer;
2086   }
2087 #endif /* HAVE_ZSTD */
2088   else {
2089     /* Unsupported library */
2090     if (complib != NULL) *complib = NULL;
2091     if (version != NULL) *version = NULL;
2092     return -1;
2093   }
2094 
2095   if (complib != NULL) *complib = strdup(clibname);
2096   if (version != NULL) *version = strdup(clibversion);
2097 
2098   return clibcode;
2099 }
2100 
2101 /* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
blosc_cbuffer_sizes(const void * cbuffer,size_t * nbytes,size_t * cbytes,size_t * blocksize)2102 void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes,
2103                          size_t *cbytes, size_t *blocksize)
2104 {
2105   uint8_t *_src = (uint8_t *)(cbuffer);    /* current pos for source buffer */
2106   uint8_t version = _src[0];               /* version of header */
2107 
2108   if (version != BLOSC_VERSION_FORMAT) {
2109     *nbytes = *blocksize = *cbytes = 0;
2110     return;
2111   }
2112 
2113   /* Read the interesting values */
2114   *nbytes = (size_t)sw32_(_src + 4);       /* uncompressed buffer size */
2115   *blocksize = (size_t)sw32_(_src + 8);    /* block size */
2116   *cbytes = (size_t)sw32_(_src + 12);      /* compressed buffer size */
2117 }
2118 
blosc_cbuffer_validate(const void * cbuffer,size_t cbytes,size_t * nbytes)2119 int blosc_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
2120   size_t header_cbytes, header_blocksize;
2121   if (cbytes < BLOSC_MIN_HEADER_LENGTH) return -1;
2122   blosc_cbuffer_sizes(cbuffer, nbytes, &header_cbytes, &header_blocksize);
2123   if (header_cbytes != cbytes) return -1;
2124   if (*nbytes > BLOSC_MAX_BUFFERSIZE) return -1;
2125   return 0;
2126 }
2127 
2128 /* Return `typesize` and `flags` from a compressed buffer. */
blosc_cbuffer_metainfo(const void * cbuffer,size_t * typesize,int * flags)2129 void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize,
2130                             int *flags)
2131 {
2132   uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2133 
2134   uint8_t version = _src[0];               /* version of header */
2135 
2136   if (version != BLOSC_VERSION_FORMAT) {
2137     *flags = *typesize = 0;
2138     return;
2139   }
2140 
2141   /* Read the interesting values */
2142   *flags = (int)_src[2] & 7;             /* first three flags */
2143   *typesize = (size_t)_src[3];           /* typesize */
2144 }
2145 
2146 
2147 /* Return version information from a compressed buffer. */
blosc_cbuffer_versions(const void * cbuffer,int * version,int * versionlz)2148 void blosc_cbuffer_versions(const void *cbuffer, int *version,
2149                             int *versionlz)
2150 {
2151   uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2152 
2153   /* Read the version info */
2154   *version = (int)_src[0];         /* blosc format version */
2155   *versionlz = (int)_src[1];       /* Lempel-Ziv compressor format version */
2156 }
2157 
2158 
2159 /* Return the compressor library/format used in a compressed buffer. */
blosc_cbuffer_complib(const void * cbuffer)2160 const char *blosc_cbuffer_complib(const void *cbuffer)
2161 {
2162   uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2163   int clibcode;
2164   const char *complib;
2165 
2166   /* Read the compressor format/library info */
2167   clibcode = (_src[2] & 0xe0) >> 5;
2168   complib = clibcode_to_clibname(clibcode);
2169   return complib;
2170 }
2171 
2172 /* Get the internal blocksize to be used during compression.  0 means
2173    that an automatic blocksize is computed internally. */
blosc_get_blocksize(void)2174 int blosc_get_blocksize(void)
2175 {
2176   return (int)g_force_blocksize;
2177 }
2178 
2179 /* Force the use of a specific blocksize.  If 0, an automatic
2180    blocksize will be used (the default). */
blosc_set_blocksize(size_t size)2181 void blosc_set_blocksize(size_t size)
2182 {
2183   g_force_blocksize = (int32_t)size;
2184 }
2185 
2186 /* Force the use of a specific split mode. */
blosc_set_splitmode(int mode)2187 void blosc_set_splitmode(int mode)
2188 {
2189   g_splitmode = mode;
2190 }
2191 
2192 /* Child global context is invalid and pool threads no longer exist post-fork.
2193  * Discard the old, inconsistent global context and global context mutex and
2194  * mark as uninitialized.  Subsequent calls through `blosc_*` interfaces will
2195  * trigger re-init of the global context.
2196  *
2197  * All pthread interfaces have undefined behavior in child handler in current
2198  * posix standards: http://pubs.opengroup.org/onlinepubs/9699919799/
2199  */
blosc_atfork_child(void)2200 void blosc_atfork_child(void) {
2201   if (!g_initlib) return;
2202 
2203   g_initlib = 0;
2204 
2205   my_free(global_comp_mutex);
2206   global_comp_mutex = NULL;
2207 
2208   my_free(g_global_context);
2209   g_global_context = NULL;
2210 
2211 }
2212 
blosc_init(void)2213 void blosc_init(void)
2214 {
2215   /* Return if we are already initialized */
2216   if (g_initlib) return;
2217 
2218   global_comp_mutex = (pthread_mutex_t*)my_malloc(sizeof(pthread_mutex_t));
2219   pthread_mutex_init(global_comp_mutex, NULL);
2220 
2221   g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context));
2222   g_global_context->threads_started = 0;
2223 
2224   #if !defined(_WIN32)
2225   /* atfork handlers are only be registered once, though multiple re-inits may
2226    * occur via blosc_destroy/blosc_init.  */
2227   if (!g_atfork_registered) {
2228     g_atfork_registered = 1;
2229     pthread_atfork(NULL, NULL, &blosc_atfork_child);
2230   }
2231   #endif
2232 
2233   g_initlib = 1;
2234 }
2235 
blosc_destroy(void)2236 void blosc_destroy(void)
2237 {
2238   /* Return if Blosc is not initialized */
2239   if (!g_initlib) return;
2240 
2241   g_initlib = 0;
2242 
2243   blosc_release_threadpool(g_global_context);
2244   my_free(g_global_context);
2245   g_global_context = NULL;
2246 
2247   pthread_mutex_destroy(global_comp_mutex);
2248   my_free(global_comp_mutex);
2249   global_comp_mutex = NULL;
2250 }
2251 
blosc_release_threadpool(struct blosc_context * context)2252 int blosc_release_threadpool(struct blosc_context* context)
2253 {
2254   int32_t t;
2255   void* status;
2256   int rc;
2257   int rc2;
2258   (void)rc;  // just to avoid 'unused-variable' warning
2259 
2260   if (context->threads_started > 0)
2261   {
2262     /* Tell all existing threads to finish */
2263     context->end_threads = 1;
2264 
2265     /* Sync threads */
2266     WAIT_INIT(-1, context);
2267 
2268     /* Join exiting threads */
2269     for (t=0; t<context->threads_started; t++) {
2270       rc2 = pthread_join(context->threads[t], &status);
2271       if (rc2) {
2272         fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
2273         fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
2274       }
2275     }
2276 
2277     /* Release mutex and condition variable objects */
2278     pthread_mutex_destroy(&context->count_mutex);
2279 
2280     /* Barriers */
2281   #ifdef _POSIX_BARRIERS_MINE
2282       pthread_barrier_destroy(&context->barr_init);
2283       pthread_barrier_destroy(&context->barr_finish);
2284   #else
2285       pthread_mutex_destroy(&context->count_threads_mutex);
2286       pthread_cond_destroy(&context->count_threads_cv);
2287   #endif
2288 
2289       /* Thread attributes */
2290   #if !defined(_WIN32)
2291       pthread_attr_destroy(&context->ct_attr);
2292   #endif
2293 
2294   }
2295 
2296   context->threads_started = 0;
2297 
2298   return 0;
2299 }
2300 
blosc_free_resources(void)2301 int blosc_free_resources(void)
2302 {
2303   /* Return if Blosc is not initialized */
2304   if (!g_initlib) return -1;
2305 
2306   return blosc_release_threadpool(g_global_context);
2307 }
2308