1 /*********************************************************************
2   Blosc - Blocked Shuffling and Compression Library
3 
4   Author: Francesc Alted <francesc@blosc.org>
5   Creation date: 2009-05-20
6 
7   See LICENSES/BLOSC.txt for details about copyright and rights to use.
8 **********************************************************************/
9 
10 
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <errno.h>
14 #include <string.h>
15 #include <sys/types.h>
16 #include <assert.h>
17 
18 #include "fastcopy.h"
19 
20 #if defined(USING_CMAKE)
21   #include "config.h"
22 #endif /*  USING_CMAKE */
23 #include "blosc.h"
24 #include "shuffle.h"
25 #include "blosclz.h"
26 #if defined(HAVE_LZ4)
27   #include "lz4.h"
28   #include "lz4hc.h"
29 #endif /*  HAVE_LZ4 */
30 #if defined(HAVE_SNAPPY)
31   #include "snappy-c.h"
32 #endif /*  HAVE_SNAPPY */
33 #if defined(HAVE_ZLIB)
34   #include "zlib.h"
35 #endif /*  HAVE_ZLIB */
36 #if defined(HAVE_ZSTD)
37   #include "zstd.h"
38 #endif /*  HAVE_ZSTD */
39 
40 #if defined(_WIN32) && !defined(__MINGW32__)
41   #include <windows.h>
42   #include <malloc.h>
43 
44   /* stdint.h only available in VS2010 (VC++ 16.0) and newer */
45   #if defined(_MSC_VER) && _MSC_VER < 1600
46     #include "win32/stdint-windows.h"
47   #else
48     #include <stdint.h>
49   #endif
50 
51   #include <process.h>
52   #define getpid _getpid
53 #else
54   #include <stdint.h>
55   #include <unistd.h>
56   #include <inttypes.h>
57 #endif  /* _WIN32 */
58 
59 /* Include the win32/pthread.h library for all the Windows builds. See #224. */
60 #if defined(_WIN32)
61   #include "win32/pthread.h"
62   #include "win32/pthread.c"
63 #else
64   #include <pthread.h>
65 #endif
66 
67 
68 /* Some useful units */
69 #define KB 1024
70 #define MB (1024 * (KB))
71 
72 /* Minimum buffer size to be compressed */
73 #define MIN_BUFFERSIZE 128       /* Cannot be smaller than 66 */
74 
75 /* The maximum number of splits in a block for compression */
76 #define MAX_SPLITS 16            /* Cannot be larger than 128 */
77 
78 /* The size of L1 cache.  32 KB is quite common nowadays. */
79 #define L1 (32 * (KB))
80 
81 /* Have problems using posix barriers when symbol value is 200112L */
82 /* This requires more investigation, but will work for the moment */
83 #if defined(_POSIX_BARRIERS) && ( (_POSIX_BARRIERS - 20012L) >= 0 && _POSIX_BARRIERS != 200112L)
84 #define _POSIX_BARRIERS_MINE
85 #endif
86 /* Synchronization variables */
87 
88 
89 struct blosc_context {
90   int32_t compress;               /* 1 if we are doing compression 0 if decompress */
91 
92   const uint8_t* src;
93   uint8_t* dest;                  /* The current pos in the destination buffer */
94   uint8_t* header_flags;          /* Flags for header */
95   int compversion;                /* Compressor version byte, only used during decompression */
96   int32_t sourcesize;             /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */
97   int32_t compressedsize;         /* Number of bytes of compressed data (only used when decompressing) */
98   int32_t nblocks;                /* Number of total blocks in buffer */
99   int32_t leftover;               /* Extra bytes at end of buffer */
100   int32_t blocksize;              /* Length of the block in bytes */
101   int32_t typesize;               /* Type size */
102   int32_t num_output_bytes;       /* Counter for the number of output bytes */
103   int32_t destsize;               /* Maximum size for destination buffer */
104   uint8_t* bstarts;               /* Start of the buffer past header info */
105   int32_t compcode;               /* Compressor code to use */
106   int clevel;                     /* Compression level (1-9) */
107   /* Function to use for decompression.  Only used when decompression */
108   int (*decompress_func)(const void* input, int compressed_length, void* output,
109                          int maxout);
110 
111   /* Threading */
112   int32_t numthreads;
113   int32_t threads_started;
114   int32_t end_threads;
115   pthread_t threads[BLOSC_MAX_THREADS];
116   int32_t tids[BLOSC_MAX_THREADS];
117   pthread_mutex_t count_mutex;
118   #ifdef _POSIX_BARRIERS_MINE
119   pthread_barrier_t barr_init;
120   pthread_barrier_t barr_finish;
121   #else
122   int32_t count_threads;
123   pthread_mutex_t count_threads_mutex;
124   pthread_cond_t count_threads_cv;
125   #endif
126   #if !defined(_WIN32)
127   pthread_attr_t ct_attr;            /* creation time attrs for threads */
128   #endif
129   int32_t thread_giveup_code;               /* error code when give up */
130   int32_t thread_nblock;                    /* block counter */
131 };
132 
133 struct thread_context {
134   struct blosc_context* parent_context;
135   int32_t tid;
136   uint8_t* tmp;
137   uint8_t* tmp2;
138   uint8_t* tmp3;
139   int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */
140 };
141 
142 /* Global context for non-contextual API */
143 static struct blosc_context* g_global_context;
144 static pthread_mutex_t* global_comp_mutex;
145 static int32_t g_compressor = BLOSC_BLOSCLZ;  /* the compressor to use by default */
146 static int32_t g_threads = 1;
147 static int32_t g_force_blocksize = 0;
148 static int32_t g_initlib = 0;
149 static int32_t g_atfork_registered = 0;
150 static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
151 
152 
153 
154 /* Wrapped function to adjust the number of threads used by blosc */
155 int blosc_set_nthreads_(struct blosc_context*);
156 
157 /* Releases the global threadpool */
158 int blosc_release_threadpool(struct blosc_context* context);
159 
160 /* Macros for synchronization */
161 
162 /* Wait until all threads are initialized */
163 #ifdef _POSIX_BARRIERS_MINE
164 #define WAIT_INIT(RET_VAL, CONTEXT_PTR)  \
165   rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \
166   if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
167     printf("Could not wait on barrier (init): %d\n", rc); \
168     return((RET_VAL));                            \
169   }
170 #else
171 #define WAIT_INIT(RET_VAL, CONTEXT_PTR)   \
172   pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
173   if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \
174     CONTEXT_PTR->count_threads++;  \
175     pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
176   } \
177   else { \
178     pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
179   } \
180   pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
181 #endif
182 
183 /* Wait for all threads to finish */
184 #ifdef _POSIX_BARRIERS_MINE
185 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR)   \
186   rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \
187   if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
188     printf("Could not wait on barrier (finish)\n"); \
189     return((RET_VAL));                              \
190   }
191 #else
192 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                           \
193   pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
194   if (CONTEXT_PTR->count_threads > 0) { \
195     CONTEXT_PTR->count_threads--; \
196     pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
197   } \
198   else { \
199     pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
200   } \
201   pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
202 #endif
203 
204 
205 /* A function for aligned malloc that is portable */
my_malloc(size_t size)206 static uint8_t *my_malloc(size_t size)
207 {
208   void *block = NULL;
209   int res = 0;
210 
211 /* Do an alignment to 32 bytes because AVX2 is supported */
212 #if defined(_WIN32)
213   /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
214   block = (void *)_aligned_malloc(size, 32);
215 #elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
216   /* Platform does have an implementation of posix_memalign */
217   res = posix_memalign(&block, 32, size);
218 #else
219   block = malloc(size);
220 #endif  /* _WIN32 */
221 
222   if (block == NULL || res != 0) {
223     printf("Error allocating memory!");
224     return NULL;
225   }
226 
227   return (uint8_t *)block;
228 }
229 
230 
231 /* Release memory booked by my_malloc */
my_free(void * block)232 static void my_free(void *block)
233 {
234 #if defined(_WIN32)
235     _aligned_free(block);
236 #else
237     free(block);
238 #endif  /* _WIN32 */
239 }
240 
241 
242 /* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */
sw32_(const uint8_t * pa)243 static int32_t sw32_(const uint8_t *pa)
244 {
245   int32_t idest;
246   uint8_t *dest = (uint8_t *)&idest;
247   int i = 1;                    /* for big/little endian detection */
248   char *p = (char *)&i;
249 
250   if (p[0] != 1) {
251     /* big endian */
252     dest[0] = pa[3];
253     dest[1] = pa[2];
254     dest[2] = pa[1];
255     dest[3] = pa[0];
256   }
257   else {
258     /* little endian */
259     dest[0] = pa[0];
260     dest[1] = pa[1];
261     dest[2] = pa[2];
262     dest[3] = pa[3];
263   }
264   return idest;
265 }
266 
267 
268 /* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */
_sw32(uint8_t * dest,int32_t a)269 static void _sw32(uint8_t* dest, int32_t a)
270 {
271   uint8_t *pa = (uint8_t *)&a;
272   int i = 1;                    /* for big/little endian detection */
273   char *p = (char *)&i;
274 
275   if (p[0] != 1) {
276     /* big endian */
277     dest[0] = pa[3];
278     dest[1] = pa[2];
279     dest[2] = pa[1];
280     dest[3] = pa[0];
281   }
282   else {
283     /* little endian */
284     dest[0] = pa[0];
285     dest[1] = pa[1];
286     dest[2] = pa[2];
287     dest[3] = pa[3];
288   }
289 }
290 
291 /*
292  * Conversion routines between compressor and compression libraries
293  */
294 
295 /* Return the library code associated with the compressor name */
compname_to_clibcode(const char * compname)296 static int compname_to_clibcode(const char *compname)
297 {
298   if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
299     return BLOSC_BLOSCLZ_LIB;
300   if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
301     return BLOSC_LZ4_LIB;
302   if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
303     return BLOSC_LZ4_LIB;
304   if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0)
305     return BLOSC_SNAPPY_LIB;
306   if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
307     return BLOSC_ZLIB_LIB;
308   if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
309     return BLOSC_ZSTD_LIB;
310   return -1;
311 }
312 
313 /* Return the library name associated with the compressor code */
clibcode_to_clibname(int clibcode)314 static const char *clibcode_to_clibname(int clibcode)
315 {
316   if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
317   if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
318   if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME;
319   if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
320   if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
321   return NULL;                  /* should never happen */
322 }
323 
324 
325 /*
326  * Conversion routines between compressor names and compressor codes
327  */
328 
329 /* Get the compressor name associated with the compressor code */
blosc_compcode_to_compname(int compcode,const char ** compname)330 int blosc_compcode_to_compname(int compcode, const char **compname)
331 {
332   int code = -1;    /* -1 means non-existent compressor code */
333   const char *name = NULL;
334 
335   /* Map the compressor code */
336   if (compcode == BLOSC_BLOSCLZ)
337     name = BLOSC_BLOSCLZ_COMPNAME;
338   else if (compcode == BLOSC_LZ4)
339     name = BLOSC_LZ4_COMPNAME;
340   else if (compcode == BLOSC_LZ4HC)
341     name = BLOSC_LZ4HC_COMPNAME;
342   else if (compcode == BLOSC_SNAPPY)
343     name = BLOSC_SNAPPY_COMPNAME;
344   else if (compcode == BLOSC_ZLIB)
345     name = BLOSC_ZLIB_COMPNAME;
346   else if (compcode == BLOSC_ZSTD)
347     name = BLOSC_ZSTD_COMPNAME;
348 
349   *compname = name;
350 
351   /* Guess if there is support for this code */
352   if (compcode == BLOSC_BLOSCLZ)
353     code = BLOSC_BLOSCLZ;
354 #if defined(HAVE_LZ4)
355   else if (compcode == BLOSC_LZ4)
356     code = BLOSC_LZ4;
357   else if (compcode == BLOSC_LZ4HC)
358     code = BLOSC_LZ4HC;
359 #endif /*  HAVE_LZ4 */
360 #if defined(HAVE_SNAPPY)
361   else if (compcode == BLOSC_SNAPPY)
362     code = BLOSC_SNAPPY;
363 #endif /*  HAVE_SNAPPY */
364 #if defined(HAVE_ZLIB)
365   else if (compcode == BLOSC_ZLIB)
366     code = BLOSC_ZLIB;
367 #endif /*  HAVE_ZLIB */
368 #if defined(HAVE_ZSTD)
369   else if (compcode == BLOSC_ZSTD)
370     code = BLOSC_ZSTD;
371 #endif /*  HAVE_ZSTD */
372 
373   return code;
374 }
375 
376 /* Get the compressor code for the compressor name. -1 if it is not available */
blosc_compname_to_compcode(const char * compname)377 int blosc_compname_to_compcode(const char *compname)
378 {
379   int code = -1;  /* -1 means non-existent compressor code */
380 
381   if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
382     code = BLOSC_BLOSCLZ;
383   }
384 #if defined(HAVE_LZ4)
385   else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
386     code = BLOSC_LZ4;
387   }
388   else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
389     code = BLOSC_LZ4HC;
390   }
391 #endif /*  HAVE_LZ4 */
392 #if defined(HAVE_SNAPPY)
393   else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) {
394     code = BLOSC_SNAPPY;
395   }
396 #endif /*  HAVE_SNAPPY */
397 #if defined(HAVE_ZLIB)
398   else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
399     code = BLOSC_ZLIB;
400   }
401 #endif /*  HAVE_ZLIB */
402 #if defined(HAVE_ZSTD)
403   else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
404     code = BLOSC_ZSTD;
405   }
406 #endif /*  HAVE_ZSTD */
407 
408 return code;
409 }
410 
411 
412 #if defined(HAVE_LZ4)
lz4_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int accel)413 static int lz4_wrap_compress(const char* input, size_t input_length,
414                              char* output, size_t maxout, int accel)
415 {
416   int cbytes;
417   cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout,
418                              accel);
419   return cbytes;
420 }
421 
lz4hc_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)422 static int lz4hc_wrap_compress(const char* input, size_t input_length,
423                                char* output, size_t maxout, int clevel)
424 {
425   int cbytes;
426   if (input_length > (size_t)(2<<30))
427     return -1;   /* input larger than 1 GB is not supported */
428   /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
429    * but levels larger than 9 does not buy much compression. */
430   cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
431                            clevel);
432   return cbytes;
433 }
434 
lz4_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)435 static int lz4_wrap_decompress(const void* input, int compressed_length,
436                                void* output, int maxout)
437 {
438   return LZ4_decompress_safe(input, output, compressed_length, maxout);
439 }
440 
lz4_wrap_decompress_unsafe(const void * input,int compressed_length,void * output,int maxout)441 static int lz4_wrap_decompress_unsafe(const void* input, int  compressed_length,
442                                       void* output, int maxout)
443 {
444   size_t cbytes;
445   cbytes = LZ4_decompress_fast(input, output, (int)maxout);
446   if (cbytes != compressed_length) {
447     return 0;
448   }
449   return (int)maxout;
450 }
451 
452 #endif /* HAVE_LZ4 */
453 
454 #if defined(HAVE_SNAPPY)
snappy_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout)455 static int snappy_wrap_compress(const char* input, size_t input_length,
456                                 char* output, size_t maxout)
457 {
458   snappy_status status;
459   size_t cl = maxout;
460   status = snappy_compress(input, input_length, output, &cl);
461   if (status != SNAPPY_OK){
462     return 0;
463   }
464   return (int)cl;
465 }
466 
snappy_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)467 static int snappy_wrap_decompress(const void* input, int compressed_length,
468                                   void* output, int maxout)
469 {
470   snappy_status status;
471   size_t ul = maxout;
472   status = snappy_uncompress(input, compressed_length, output, &ul);
473   if (status != SNAPPY_OK){
474     return 0;
475   }
476   return (int)ul;
477 }
478 #endif /* HAVE_SNAPPY */
479 
480 #if defined(HAVE_ZLIB)
481 /* zlib is not very respectful with sharing name space with others.
482  Fortunately, its names do not collide with those already in blosc. */
zlib_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)483 static int zlib_wrap_compress(const char* input, size_t input_length,
484                               char* output, size_t maxout, int clevel)
485 {
486   int status;
487   uLongf cl = maxout;
488   status = compress2(
489              (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
490   if (status != Z_OK){
491     return 0;
492   }
493   return (int)cl;
494 }
495 
zlib_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)496 static int zlib_wrap_decompress(const void* input, int compressed_length,
497                                 void* output, int maxout) {
498   int status;
499   uLongf ul = maxout;
500   status = uncompress(
501              (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
502   if (status != Z_OK){
503     return 0;
504   }
505   return (int)ul;
506 }
507 #endif /*  HAVE_ZLIB */
508 
509 #if defined(HAVE_ZSTD)
zstd_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)510 static int zstd_wrap_compress(const char* input, size_t input_length,
511                               char* output, size_t maxout, int clevel) {
512   size_t code;
513   clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
514   /* Make the level 8 close enough to maxCLevel */
515   if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
516   code = ZSTD_compress(
517       (void*)output, maxout, (void*)input, input_length, clevel);
518   if (ZSTD_isError(code)) {
519     return 0;
520   }
521   return (int)code;
522 }
523 
zstd_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)524 static int zstd_wrap_decompress(const void* input, int compressed_length,
525                                 void* output, int maxout) {
526   size_t code;
527   code = ZSTD_decompress(
528       (void*)output, maxout, (void*)input, compressed_length);
529   if (ZSTD_isError(code)) {
530     return 0;
531   }
532   return (int)code;
533 }
534 #endif /*  HAVE_ZSTD */
535 
initialize_decompress_func(struct blosc_context * context,int unsafe)536 static int initialize_decompress_func(struct blosc_context* context,
537                                       int unsafe) {
538   int8_t header_flags = *(context->header_flags);
539   int32_t compformat = (header_flags & 0xe0) >> 5;
540   int compversion = context->compversion;
541 
542   if (compformat == BLOSC_BLOSCLZ_FORMAT) {
543     if (compversion != BLOSC_BLOSCLZ_VERSION_FORMAT) {
544       return -9;
545     }
546     context->decompress_func =
547         unsafe ? &blosclz_decompress_unsafe : &blosclz_decompress;
548     return 0;
549   }
550 #if defined(HAVE_LZ4)
551   if (compformat == BLOSC_LZ4_FORMAT) {
552     if (compversion != BLOSC_LZ4_VERSION_FORMAT) {
553       return -9;
554     }
555     context->decompress_func =
556         unsafe ? &lz4_wrap_decompress_unsafe : &lz4_wrap_decompress;
557     return 0;
558   }
559 #endif /*  HAVE_LZ4 */
560 #if defined(HAVE_SNAPPY)
561   if (compformat == BLOSC_SNAPPY_FORMAT) {
562     if (compversion != BLOSC_SNAPPY_VERSION_FORMAT) {
563       return -9;
564     }
565     context->decompress_func = &snappy_wrap_decompress;
566     return 0;
567   }
568 #endif /*  HAVE_SNAPPY */
569 #if defined(HAVE_ZLIB)
570   if (compformat == BLOSC_ZLIB_FORMAT) {
571     if (compversion != BLOSC_ZLIB_VERSION_FORMAT) {
572       return -9;
573     }
574     context->decompress_func = &zlib_wrap_decompress;
575     return 0;
576   }
577 #endif /*  HAVE_ZLIB */
578 #if defined(HAVE_ZSTD)
579   if (compformat == BLOSC_ZSTD_FORMAT) {
580     if (compversion != BLOSC_ZSTD_VERSION_FORMAT) {
581       return -9;
582     }
583     context->decompress_func = &zstd_wrap_decompress;
584     return 0;
585   }
586 #endif /*  HAVE_ZSTD */
587   return -5; /* signals no decompression support */
588 }
589 
590 /* Compute acceleration for blosclz */
get_accel(const struct blosc_context * context)591 static int get_accel(const struct blosc_context* context) {
592   int32_t clevel = context->clevel;
593 
594   if (context->compcode == BLOSC_LZ4) {
595     /* This acceleration setting based on discussions held in:
596      * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
597      */
598     return (10 - clevel);
599   }
600   return 1;
601 }
602 
603 
604 /* Shuffle & compress a single block */
blosc_c(const struct blosc_context * context,int32_t blocksize,int32_t leftoverblock,int32_t ntbytes,int32_t maxbytes,const uint8_t * src,uint8_t * dest,uint8_t * tmp,uint8_t * tmp2)605 static int blosc_c(const struct blosc_context* context, int32_t blocksize,
606                    int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes,
607                    const uint8_t *src, uint8_t *dest, uint8_t *tmp,
608                    uint8_t *tmp2)
609 {
610   int8_t header_flags = *(context->header_flags);
611   int dont_split = (header_flags & 0x10) >> 4;
612   int32_t j, neblock, nsplits;
613   int32_t cbytes;                   /* number of compressed bytes in split */
614   int32_t ctbytes = 0;              /* number of compressed bytes in block */
615   int32_t maxout;
616   int32_t typesize = context->typesize;
617   const uint8_t *_tmp = src;
618   const char *compname;
619   int accel;
620   int bscount;
621   int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
622   int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
623                       (blocksize >= typesize));
624 
625   if (doshuffle) {
626     /* Byte shuffling only makes sense if typesize > 1 */
627     blosc_internal_shuffle(typesize, blocksize, src, tmp);
628     _tmp = tmp;
629   }
630   /* We don't allow more than 1 filter at the same time (yet) */
631   else if (dobitshuffle) {
632     bscount = blosc_internal_bitshuffle(typesize, blocksize, src, tmp, tmp2);
633     if (bscount < 0)
634       return bscount;
635     _tmp = tmp;
636   }
637 
638   /* Calculate acceleration for different compressors */
639   accel = get_accel(context);
640 
641   /* The number of splits for this block */
642   if (!dont_split && !leftoverblock) {
643     nsplits = typesize;
644   }
645   else {
646     nsplits = 1;
647   }
648   neblock = blocksize / nsplits;
649   for (j = 0; j < nsplits; j++) {
650     dest += sizeof(int32_t);
651     ntbytes += (int32_t)sizeof(int32_t);
652     ctbytes += (int32_t)sizeof(int32_t);
653     maxout = neblock;
654     #if defined(HAVE_SNAPPY)
655     if (context->compcode == BLOSC_SNAPPY) {
656       /* TODO perhaps refactor this to keep the value stashed somewhere */
657       maxout = snappy_max_compressed_length(neblock);
658     }
659     #endif /*  HAVE_SNAPPY */
660     if (ntbytes+maxout > maxbytes) {
661       maxout = maxbytes - ntbytes;   /* avoid buffer overrun */
662       if (maxout <= 0) {
663         return 0;                  /* non-compressible block */
664       }
665     }
666     if (context->compcode == BLOSC_BLOSCLZ) {
667       cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock,
668                                 dest, maxout);
669     }
670     #if defined(HAVE_LZ4)
671     else if (context->compcode == BLOSC_LZ4) {
672       cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
673                                  (char *)dest, (size_t)maxout, accel);
674     }
675     else if (context->compcode == BLOSC_LZ4HC) {
676       cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
677                                    (char *)dest, (size_t)maxout,
678                                    context->clevel);
679     }
680     #endif /* HAVE_LZ4 */
681     #if defined(HAVE_SNAPPY)
682     else if (context->compcode == BLOSC_SNAPPY) {
683       cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
684                                     (char *)dest, (size_t)maxout);
685     }
686     #endif /* HAVE_SNAPPY */
687     #if defined(HAVE_ZLIB)
688     else if (context->compcode == BLOSC_ZLIB) {
689       cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
690                                   (char *)dest, (size_t)maxout,
691                                   context->clevel);
692     }
693     #endif /* HAVE_ZLIB */
694     #if defined(HAVE_ZSTD)
695     else if (context->compcode == BLOSC_ZSTD) {
696       cbytes = zstd_wrap_compress((char*)_tmp + j * neblock, (size_t)neblock,
697                                   (char*)dest, (size_t)maxout, context->clevel);
698     }
699     #endif /* HAVE_ZSTD */
700 
701     else {
702       blosc_compcode_to_compname(context->compcode, &compname);
703       fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
704       fprintf(stderr, "compression support.  Please use one having it.");
705       return -5;    /* signals no compression support */
706     }
707 
708     if (cbytes > maxout) {
709       /* Buffer overrun caused by compression (should never happen) */
710       return -1;
711     }
712     else if (cbytes < 0) {
713       /* cbytes should never be negative */
714       return -2;
715     }
716     else if (cbytes == 0 || cbytes == neblock) {
717       /* The compressor has been unable to compress data at all. */
718       /* Before doing the copy, check that we are not running into a
719          buffer overflow. */
720       if ((ntbytes+neblock) > maxbytes) {
721         return 0;    /* Non-compressible data */
722       }
723       blosc_internal_fastcopy(dest, _tmp + j * neblock, neblock);
724       cbytes = neblock;
725     }
726     _sw32(dest - 4, cbytes);
727     dest += cbytes;
728     ntbytes += cbytes;
729     ctbytes += cbytes;
730   }  /* Closes j < nsplits */
731 
732   return ctbytes;
733 }
734 
735 /* Decompress & unshuffle a single block */
blosc_d(struct blosc_context * context,int32_t blocksize,int32_t leftoverblock,const uint8_t * base_src,int32_t src_offset,uint8_t * dest,uint8_t * tmp,uint8_t * tmp2)736 static int blosc_d(struct blosc_context* context, int32_t blocksize,
737                    int32_t leftoverblock, const uint8_t* base_src,
738                    int32_t src_offset, uint8_t* dest, uint8_t* tmp,
739                    uint8_t* tmp2) {
740   int8_t header_flags = *(context->header_flags);
741   int dont_split = (header_flags & 0x10) >> 4;
742   int32_t j, neblock, nsplits;
743   int32_t nbytes;                /* number of decompressed bytes in split */
744   const int32_t compressedsize = context->compressedsize;
745   int32_t cbytes;                /* number of compressed bytes in split */
746   int32_t ctbytes = 0;           /* number of compressed bytes in block */
747   int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
748   uint8_t *_tmp = dest;
749   int32_t typesize = context->typesize;
750   int bscount;
751   int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
752   int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
753                       (blocksize >= typesize));
754   const uint8_t* src;
755 
756   if (doshuffle || dobitshuffle) {
757     _tmp = tmp;
758   }
759 
760   /* The number of splits for this block */
761   if (!dont_split &&
762       /* For compatibility with before the introduction of the split flag */
763       ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE) &&
764       !leftoverblock) {
765     nsplits = typesize;
766   }
767   else {
768     nsplits = 1;
769   }
770 
771   neblock = blocksize / nsplits;
772   for (j = 0; j < nsplits; j++) {
773     /* Validate src_offset */
774     if (src_offset < 0 || src_offset > compressedsize - sizeof(int32_t)) {
775       return -1;
776     }
777     cbytes = sw32_(base_src + src_offset); /* amount of compressed bytes */
778     src_offset += sizeof(int32_t);
779     /* Validate cbytes */
780     if (cbytes < 0 || cbytes > context->compressedsize - src_offset) {
781       return -1;
782     }
783     ctbytes += (int32_t)sizeof(int32_t);
784     src = base_src + src_offset;
785     /* Uncompress */
786     if (cbytes == neblock) {
787       blosc_internal_fastcopy(_tmp, src, neblock);
788       nbytes = neblock;
789     }
790     else {
791       nbytes = context->decompress_func(src, cbytes, _tmp, neblock);
792       /* Check that decompressed bytes number is correct */
793       if (nbytes != neblock) {
794         return -2;
795       }
796     }
797     src_offset += cbytes;
798     ctbytes += cbytes;
799     _tmp += nbytes;
800     ntbytes += nbytes;
801   } /* Closes j < nsplits */
802 
803   if (doshuffle) {
804     blosc_internal_unshuffle(typesize, blocksize, tmp, dest);
805   }
806   else if (dobitshuffle) {
807     bscount = blosc_internal_bitunshuffle(typesize, blocksize, tmp, dest, tmp2);
808     if (bscount < 0)
809       return bscount;
810   }
811 
812   /* Return the number of uncompressed bytes */
813   return ntbytes;
814 }
815 
816 /* Serial version for compression/decompression */
serial_blosc(struct blosc_context * context)817 static int serial_blosc(struct blosc_context* context)
818 {
819   int32_t j, bsize, leftoverblock;
820   int32_t cbytes;
821 
822   int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
823   int32_t ntbytes = context->num_output_bytes;
824 
825   uint8_t *tmp = my_malloc(context->blocksize + ebsize);
826   uint8_t *tmp2 = tmp + context->blocksize;
827 
828   for (j = 0; j < context->nblocks; j++) {
829     if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) {
830       _sw32(context->bstarts + j * 4, ntbytes);
831     }
832     bsize = context->blocksize;
833     leftoverblock = 0;
834     if ((j == context->nblocks - 1) && (context->leftover > 0)) {
835       bsize = context->leftover;
836       leftoverblock = 1;
837     }
838     if (context->compress) {
839       if (*(context->header_flags) & BLOSC_MEMCPYED) {
840         /* We want to memcpy only */
841         blosc_internal_fastcopy(context->dest + BLOSC_MAX_OVERHEAD + j * context->blocksize,
842                                 context->src + j * context->blocksize, bsize);
843         cbytes = bsize;
844       }
845       else {
846         /* Regular compression */
847         cbytes = blosc_c(context, bsize, leftoverblock, ntbytes,
848                          context->destsize, context->src+j*context->blocksize,
849                          context->dest+ntbytes, tmp, tmp2);
850         if (cbytes == 0) {
851           ntbytes = 0;              /* uncompressible data */
852           break;
853         }
854       }
855     }
856     else {
857       if (*(context->header_flags) & BLOSC_MEMCPYED) {
858         /* We want to memcpy only */
859         blosc_internal_fastcopy(context->dest + j * context->blocksize,
860                                 context->src + BLOSC_MAX_OVERHEAD + j * context->blocksize, bsize);
861         cbytes = bsize;
862       }
863       else {
864         /* Regular decompression */
865         cbytes = blosc_d(context, bsize, leftoverblock, context->src,
866                          sw32_(context->bstarts + j * 4),
867                          context->dest + j * context->blocksize, tmp, tmp2);
868       }
869     }
870     if (cbytes < 0) {
871       ntbytes = cbytes;         /* error in blosc_c or blosc_d */
872       break;
873     }
874     ntbytes += cbytes;
875   }
876 
877   /* Free temporaries */
878   my_free(tmp);
879 
880   return ntbytes;
881 }
882 
883 
884 /* Threaded version for compression/decompression */
parallel_blosc(struct blosc_context * context)885 static int parallel_blosc(struct blosc_context* context)
886 {
887   int rc;
888   (void)rc;  // just to avoid 'unused-variable' warning
889 
890   /* Check whether we need to restart threads */
891   blosc_set_nthreads_(context);
892 
893   /* Set sentinels */
894   context->thread_giveup_code = 1;
895   context->thread_nblock = -1;
896 
897   /* Synchronization point for all threads (wait for initialization) */
898   WAIT_INIT(-1, context);
899 
900   /* Synchronization point for all threads (wait for finalization) */
901   WAIT_FINISH(-1, context);
902 
903   if (context->thread_giveup_code > 0) {
904     /* Return the total bytes (de-)compressed in threads */
905     return context->num_output_bytes;
906   }
907   else {
908     /* Compression/decompression gave up.  Return error code. */
909     return context->thread_giveup_code;
910   }
911 }
912 
913 
914 /* Do the compression or decompression of the buffer depending on the
915    global params. */
do_job(struct blosc_context * context)916 static int do_job(struct blosc_context* context)
917 {
918   int32_t ntbytes;
919 
920   /* Run the serial version when nthreads is 1 or when the buffers are
921      not much larger than blocksize */
922   if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
923     ntbytes = serial_blosc(context);
924   }
925   else {
926     ntbytes = parallel_blosc(context);
927   }
928 
929   return ntbytes;
930 }
931 
932 
933 /* Whether a codec is meant for High Compression Ratios */
934 #define HCR(codec) (  \
935              ((codec) == BLOSC_LZ4HC) ||                  \
936              ((codec) == BLOSC_ZLIB) ||                   \
937              ((codec) == BLOSC_ZSTD) ? 1 : 0 )
938 
939 
940 /* Conditions for splitting a block before compressing with a codec. */
split_block(int compcode,int typesize,int blocksize)941 static int split_block(int compcode, int typesize, int blocksize) {
942   int splitblock = -1;
943 
944   switch (g_splitmode) {
945     case BLOSC_ALWAYS_SPLIT:
946       splitblock = 1;
947       break;
948     case BLOSC_NEVER_SPLIT:
949       splitblock = 0;
950       break;
951     case BLOSC_AUTO_SPLIT:
952       /* Normally all the compressors designed for speed benefit from a
953          split.  However, in conducted benchmarks LZ4 seems that it runs
954          faster if we don't split, which is quite surprising. */
955       splitblock= (((compcode == BLOSC_BLOSCLZ) ||
956                     (compcode == BLOSC_SNAPPY)) &&
957                    (typesize <= MAX_SPLITS) &&
958                    (blocksize / typesize) >= MIN_BUFFERSIZE);
959       break;
960     case BLOSC_FORWARD_COMPAT_SPLIT:
961       /* The zstd support was introduced at the same time than the split flag, so
962        * there should be not a problem with not splitting bloscks with it */
963       splitblock = ((compcode != BLOSC_ZSTD) &&
964                     (typesize <= MAX_SPLITS) &&
965                     (blocksize / typesize) >= MIN_BUFFERSIZE);
966       break;
967     default:
968       fprintf(stderr, "Split mode %d not supported", g_splitmode);
969   }
970   return splitblock;
971 }
972 
973 
compute_blocksize(struct blosc_context * context,int32_t clevel,int32_t typesize,int32_t nbytes,int32_t forced_blocksize)974 static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel,
975                                  int32_t typesize, int32_t nbytes,
976                                  int32_t forced_blocksize)
977 {
978   int32_t blocksize;
979 
980   /* Protection against very small buffers */
981   if (nbytes < (int32_t)typesize) {
982     return 1;
983   }
984 
985   blocksize = nbytes;           /* Start by a whole buffer as blocksize */
986 
987   if (forced_blocksize) {
988     blocksize = forced_blocksize;
989     /* Check that forced blocksize is not too small */
990     if (blocksize < MIN_BUFFERSIZE) {
991       blocksize = MIN_BUFFERSIZE;
992     }
993     /* Check that forced blocksize is not too large */
994     if (blocksize > BLOSC_MAX_BLOCKSIZE) {
995       blocksize = BLOSC_MAX_BLOCKSIZE;
996     }
997   }
998   else if (nbytes >= L1) {
999     blocksize = L1;
1000 
1001     /* For HCR codecs, increase the block sizes by a factor of 2 because they
1002        are meant for compressing large blocks (i.e. they show a big overhead
1003        when compressing small ones). */
1004     if (HCR(context->compcode)) {
1005       blocksize *= 2;
1006     }
1007 
1008     switch (clevel) {
1009       case 0:
1010         /* Case of plain copy */
1011         blocksize /= 4;
1012         break;
1013       case 1:
1014         blocksize /= 2;
1015         break;
1016       case 2:
1017         blocksize *= 1;
1018         break;
1019       case 3:
1020         blocksize *= 2;
1021         break;
1022       case 4:
1023       case 5:
1024         blocksize *= 4;
1025         break;
1026       case 6:
1027       case 7:
1028       case 8:
1029         blocksize *= 8;
1030         break;
1031       case 9:
1032         blocksize *= 8;
1033         if (HCR(context->compcode)) {
1034           blocksize *= 2;
1035         }
1036         break;
1037       default:
1038         assert(0);
1039         break;
1040     }
1041   }
1042 
1043   /* Enlarge the blocksize for splittable codecs */
1044   if (clevel > 0 && split_block(context->compcode, typesize, blocksize)) {
1045     if (blocksize > (1 << 16)) {
1046       /* Do not use a too large split buffer (> 64 KB) for splitting codecs */
1047       blocksize = (1 << 16);
1048     }
1049     blocksize *= typesize;
1050     if (blocksize < (1 << 16)) {
1051       /* Do not use a too small blocksize (< 64 KB) when typesize is small */
1052       blocksize = (1 << 16);
1053     }
1054   }
1055 
1056   /* Check that blocksize is not too large */
1057   if (blocksize > (int32_t)nbytes) {
1058     blocksize = nbytes;
1059   }
1060 
1061   /* blocksize *must absolutely* be a multiple of the typesize */
1062   if (blocksize > typesize) {
1063     blocksize = blocksize / typesize * typesize;
1064   }
1065 
1066   return blocksize;
1067 }
1068 
initialize_context_compression(struct blosc_context * context,int clevel,int doshuffle,size_t typesize,size_t sourcesize,const void * src,void * dest,size_t destsize,int32_t compressor,int32_t blocksize,int32_t numthreads)1069 static int initialize_context_compression(struct blosc_context* context,
1070                           int clevel,
1071                           int doshuffle,
1072                           size_t typesize,
1073                           size_t sourcesize,
1074                           const void* src,
1075                           void* dest,
1076                           size_t destsize,
1077                           int32_t compressor,
1078                           int32_t blocksize,
1079                           int32_t numthreads)
1080 {
1081   /* Set parameters */
1082   context->compress = 1;
1083   context->src = (const uint8_t*)src;
1084   context->dest = (uint8_t *)(dest);
1085   context->num_output_bytes = 0;
1086   context->destsize = (int32_t)destsize;
1087   context->sourcesize = sourcesize;
1088   context->typesize = typesize;
1089   context->compcode = compressor;
1090   context->numthreads = numthreads;
1091   context->end_threads = 0;
1092   context->clevel = clevel;
1093 
1094   /* Check buffer size limits */
1095   if (sourcesize > BLOSC_MAX_BUFFERSIZE) {
1096     fprintf(stderr, "Input buffer size cannot exceed %d bytes\n",
1097             BLOSC_MAX_BUFFERSIZE);
1098     return -1;
1099   }
1100   if (destsize < BLOSC_MAX_OVERHEAD) {
1101     fprintf(stderr, "Output buffer size should be larger than %d bytes\n",
1102             BLOSC_MAX_OVERHEAD);
1103     return -1;
1104   }
1105 
1106   /* Compression level */
1107   if (clevel < 0 || clevel > 9) {
1108     /* If clevel not in 0..9, print an error */
1109     fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
1110     return -10;
1111   }
1112 
1113   /* Shuffle */
1114   if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) {
1115     fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n");
1116     return -10;
1117   }
1118 
1119   /* Check typesize limits */
1120   if (context->typesize > BLOSC_MAX_TYPESIZE) {
1121     /* If typesize is too large, treat buffer as an 1-byte stream. */
1122     context->typesize = 1;
1123   }
1124 
1125   /* Get the blocksize */
1126   context->blocksize = compute_blocksize(context, clevel, (int32_t)context->typesize, context->sourcesize, blocksize);
1127 
1128   /* Compute number of blocks in buffer */
1129   context->nblocks = context->sourcesize / context->blocksize;
1130   context->leftover = context->sourcesize % context->blocksize;
1131   context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks;
1132 
1133   return 1;
1134 }
1135 
1136 
write_compression_header(struct blosc_context * context,int clevel,int doshuffle)1137 static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle)
1138 {
1139   int32_t compformat;
1140   int dont_split;
1141 
1142   /* Write version header for this block */
1143   context->dest[0] = BLOSC_VERSION_FORMAT;           /* blosc format version */
1144 
1145   /* Write compressor format */
1146   compformat = -1;
1147   switch (context->compcode)
1148   {
1149   case BLOSC_BLOSCLZ:
1150     compformat = BLOSC_BLOSCLZ_FORMAT;
1151     context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */
1152     break;
1153 
1154 #if defined(HAVE_LZ4)
1155   case BLOSC_LZ4:
1156     compformat = BLOSC_LZ4_FORMAT;
1157     context->dest[1] = BLOSC_LZ4_VERSION_FORMAT;  /* lz4 format version */
1158     break;
1159   case BLOSC_LZ4HC:
1160     compformat = BLOSC_LZ4HC_FORMAT;
1161     context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */
1162     break;
1163 #endif /* HAVE_LZ4 */
1164 
1165 #if defined(HAVE_SNAPPY)
1166   case BLOSC_SNAPPY:
1167     compformat = BLOSC_SNAPPY_FORMAT;
1168     context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT;    /* snappy format version */
1169     break;
1170 #endif /* HAVE_SNAPPY */
1171 
1172 #if defined(HAVE_ZLIB)
1173   case BLOSC_ZLIB:
1174     compformat = BLOSC_ZLIB_FORMAT;
1175     context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT;      /* zlib format version */
1176     break;
1177 #endif /* HAVE_ZLIB */
1178 
1179 #if defined(HAVE_ZSTD)
1180   case BLOSC_ZSTD:
1181     compformat = BLOSC_ZSTD_FORMAT;
1182     context->dest[1] = BLOSC_ZSTD_VERSION_FORMAT;      /* zstd format version */
1183     break;
1184 #endif /* HAVE_ZSTD */
1185 
1186   default:
1187   {
1188     const char *compname;
1189     compname = clibcode_to_clibname(compformat);
1190     fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
1191     fprintf(stderr, "compression support.  Please use one having it.");
1192     return -5;    /* signals no compression support */
1193     break;
1194   }
1195   }
1196 
1197   context->header_flags = context->dest+2;  /* flags */
1198   context->dest[2] = 0;  /* zeroes flags */
1199   context->dest[3] = (uint8_t)context->typesize;  /* type size */
1200   _sw32(context->dest + 4, context->sourcesize);  /* size of the buffer */
1201   _sw32(context->dest + 8, context->blocksize);  /* block size */
1202   context->bstarts = context->dest + 16;  /* starts for every block */
1203   context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks;  /* space for header and pointers */
1204 
1205   if (context->clevel == 0) {
1206     /* Compression level 0 means buffer to be memcpy'ed */
1207     *(context->header_flags) |= BLOSC_MEMCPYED;
1208     context->num_output_bytes = 16;      /* space just for header */
1209   }
1210 
1211   if (context->sourcesize < MIN_BUFFERSIZE) {
1212     /* Buffer is too small.  Try memcpy'ing. */
1213     *(context->header_flags) |= BLOSC_MEMCPYED;
1214     context->num_output_bytes = 16;      /* space just for header */
1215   }
1216 
1217   if (doshuffle == BLOSC_SHUFFLE) {
1218     /* Byte-shuffle is active */
1219     *(context->header_flags) |= BLOSC_DOSHUFFLE;     /* bit 0 set to one in flags */
1220   }
1221 
1222   if (doshuffle == BLOSC_BITSHUFFLE) {
1223     /* Bit-shuffle is active */
1224     *(context->header_flags) |= BLOSC_DOBITSHUFFLE;  /* bit 2 set to one in flags */
1225   }
1226 
1227   dont_split = !split_block(context->compcode, context->typesize,
1228                             context->blocksize);
1229   *(context->header_flags) |= dont_split << 4;  /* dont_split is in bit 4 */
1230   *(context->header_flags) |= compformat << 5;  /* compressor format starts at bit 5 */
1231 
1232   return 1;
1233 }
1234 
1235 
blosc_compress_context(struct blosc_context * context)1236 int blosc_compress_context(struct blosc_context* context)
1237 {
1238   int32_t ntbytes = 0;
1239 
1240   if ((*(context->header_flags) & BLOSC_MEMCPYED) &&
1241       (context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize)) {
1242     return 0;   /* data cannot be copied without overrun destination */
1243   }
1244 
1245   /* Do the actual compression */
1246   ntbytes = do_job(context);
1247   if (ntbytes < 0) {
1248     return -1;
1249   }
1250   if ((ntbytes == 0) && (context->sourcesize + BLOSC_MAX_OVERHEAD <= context->destsize)) {
1251     /* Last chance for fitting `src` buffer in `dest`.  Update flags and force a copy. */
1252     *(context->header_flags) |= BLOSC_MEMCPYED;
1253     context->num_output_bytes = BLOSC_MAX_OVERHEAD;  /* reset the output bytes in previous step */
1254     ntbytes = do_job(context);
1255     if (ntbytes < 0) {
1256       return -1;
1257     }
1258   }
1259 
1260   /* Set the number of compressed bytes in header */
1261   _sw32(context->dest + 12, ntbytes);
1262 
1263   assert(ntbytes <= context->destsize);
1264   return ntbytes;
1265 }
1266 
1267 /* The public routine for compression with context. */
blosc_compress_ctx(int clevel,int doshuffle,size_t typesize,size_t nbytes,const void * src,void * dest,size_t destsize,const char * compressor,size_t blocksize,int numinternalthreads)1268 int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
1269                        size_t nbytes, const void* src, void* dest,
1270                        size_t destsize, const char* compressor,
1271                        size_t blocksize, int numinternalthreads)
1272 {
1273   int error, result;
1274   struct blosc_context context;
1275 
1276   context.threads_started = 0;
1277   error = initialize_context_compression(&context, clevel, doshuffle, typesize,
1278 					 nbytes, src, dest, destsize,
1279 					 blosc_compname_to_compcode(compressor),
1280 					 blocksize, numinternalthreads);
1281   if (error < 0) { return error; }
1282 
1283   error = write_compression_header(&context, clevel, doshuffle);
1284   if (error < 0) { return error; }
1285 
1286   result = blosc_compress_context(&context);
1287 
1288   if (numinternalthreads > 1)
1289   {
1290     blosc_release_threadpool(&context);
1291   }
1292 
1293   return result;
1294 }
1295 
1296 /* The public routine for compression.  See blosc.h for docstrings. */
blosc_compress(int clevel,int doshuffle,size_t typesize,size_t nbytes,const void * src,void * dest,size_t destsize)1297 int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
1298                    const void *src, void *dest, size_t destsize)
1299 {
1300   int result;
1301   char* envvar;
1302 
1303   /* Check if should initialize */
1304   if (!g_initlib) blosc_init();
1305 
1306   /* Check for environment variables */
1307   envvar = getenv("BLOSC_CLEVEL");
1308   if (envvar != NULL) {
1309     long value;
1310     value = strtol(envvar, NULL, 10);
1311     if ((value != EINVAL) && (value >= 0)) {
1312       clevel = (int)value;
1313     }
1314   }
1315 
1316   envvar = getenv("BLOSC_SHUFFLE");
1317   if (envvar != NULL) {
1318     if (strcmp(envvar, "NOSHUFFLE") == 0) {
1319       doshuffle = BLOSC_NOSHUFFLE;
1320     }
1321     if (strcmp(envvar, "SHUFFLE") == 0) {
1322       doshuffle = BLOSC_SHUFFLE;
1323     }
1324     if (strcmp(envvar, "BITSHUFFLE") == 0) {
1325       doshuffle = BLOSC_BITSHUFFLE;
1326     }
1327   }
1328 
1329   envvar = getenv("BLOSC_TYPESIZE");
1330   if (envvar != NULL) {
1331     long value;
1332     value = strtol(envvar, NULL, 10);
1333     if ((value != EINVAL) && (value > 0)) {
1334       typesize = (int)value;
1335     }
1336   }
1337 
1338   envvar = getenv("BLOSC_COMPRESSOR");
1339   if (envvar != NULL) {
1340     result = blosc_set_compressor(envvar);
1341     if (result < 0) { return result; }
1342   }
1343 
1344   envvar = getenv("BLOSC_BLOCKSIZE");
1345   if (envvar != NULL) {
1346     long blocksize;
1347     blocksize = strtol(envvar, NULL, 10);
1348     if ((blocksize != EINVAL) && (blocksize > 0)) {
1349       blosc_set_blocksize((size_t)blocksize);
1350     }
1351   }
1352 
1353   envvar = getenv("BLOSC_NTHREADS");
1354   if (envvar != NULL) {
1355     long nthreads;
1356     nthreads = strtol(envvar, NULL, 10);
1357     if ((nthreads != EINVAL) && (nthreads > 0)) {
1358       result = blosc_set_nthreads((int)nthreads);
1359       if (result < 0) { return result; }
1360     }
1361   }
1362 
1363   envvar = getenv("BLOSC_SPLITMODE");
1364   if (envvar != NULL) {
1365     if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
1366       blosc_set_splitmode(BLOSC_FORWARD_COMPAT_SPLIT);
1367     }
1368     else if (strcmp(envvar, "AUTO") == 0) {
1369       blosc_set_splitmode(BLOSC_AUTO_SPLIT);
1370     }
1371     else if (strcmp(envvar, "ALWAYS") == 0) {
1372       blosc_set_splitmode(BLOSC_ALWAYS_SPLIT);
1373     }
1374     else if (strcmp(envvar, "NEVER") == 0) {
1375       blosc_set_splitmode(BLOSC_NEVER_SPLIT);
1376     }
1377     else {
1378       fprintf(stderr, "BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
1379       return -1;
1380     }
1381   }
1382 
1383   /* Check for a BLOSC_NOLOCK environment variable.  It is important
1384      that this should be the last env var so that it can take the
1385      previous ones into account */
1386   envvar = getenv("BLOSC_NOLOCK");
1387   if (envvar != NULL) {
1388     const char *compname;
1389     blosc_compcode_to_compname(g_compressor, &compname);
1390     result = blosc_compress_ctx(clevel, doshuffle, typesize,
1391 				nbytes, src, dest, destsize,
1392 				compname, g_force_blocksize, g_threads);
1393     return result;
1394   }
1395 
1396   pthread_mutex_lock(global_comp_mutex);
1397 
1398   do {
1399     result = initialize_context_compression(g_global_context, clevel, doshuffle,
1400                                            typesize, nbytes, src, dest, destsize,
1401                                            g_compressor, g_force_blocksize,
1402                                            g_threads);
1403     if (result < 0) { break; }
1404 
1405     result = write_compression_header(g_global_context, clevel, doshuffle);
1406     if (result < 0) { break; }
1407 
1408     result = blosc_compress_context(g_global_context);
1409   } while (0);
1410 
1411   pthread_mutex_unlock(global_comp_mutex);
1412 
1413   return result;
1414 }
1415 
blosc_run_decompression_with_context(struct blosc_context * context,const void * src,void * dest,size_t destsize,int numinternalthreads,int unsafe)1416 static int blosc_run_decompression_with_context(struct blosc_context* context,
1417                                                 const void* src,
1418                                                 void* dest,
1419                                                 size_t destsize,
1420                                                 int numinternalthreads,
1421                                                 int unsafe)
1422 {
1423   uint8_t version;
1424   int32_t ntbytes;
1425 
1426   context->compress = 0;
1427   context->src = (const uint8_t*)src;
1428   context->dest = (uint8_t*)dest;
1429   context->destsize = destsize;
1430   context->num_output_bytes = 0;
1431   context->numthreads = numinternalthreads;
1432   context->end_threads = 0;
1433 
1434   /* Read the header block */
1435   version = context->src[0];                        /* blosc format version */
1436   context->compversion = context->src[1];
1437 
1438   context->header_flags = (uint8_t*)(context->src + 2);           /* flags */
1439   context->typesize = (int32_t)context->src[3];      /* typesize */
1440   context->sourcesize = sw32_(context->src + 4);     /* buffer size */
1441   context->blocksize = sw32_(context->src + 8);      /* block size */
1442   context->compressedsize = sw32_(context->src + 12); /* compressed buffer size */
1443   context->bstarts = (uint8_t*)(context->src + 16);
1444 
1445   if (context->sourcesize == 0) {
1446     /* Source buffer was empty, so we are done */
1447     return 0;
1448   }
1449 
1450   if (context->blocksize <= 0 || context->blocksize > destsize ||
1451       context->blocksize > BLOSC_MAX_BLOCKSIZE || context->typesize <= 0 ||
1452       context->typesize > BLOSC_MAX_TYPESIZE) {
1453     return -1;
1454   }
1455 
1456   if (version != BLOSC_VERSION_FORMAT) {
1457     /* Version from future */
1458     return -1;
1459   }
1460   if (*context->header_flags & 0x08) {
1461     /* compressor flags from the future */
1462     return -1;
1463   }
1464 
1465   /* Compute some params */
1466   /* Total blocks */
1467   context->nblocks = context->sourcesize / context->blocksize;
1468   context->leftover = context->sourcesize % context->blocksize;
1469   context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks;
1470 
1471   /* Check that we have enough space to decompress */
1472   if (context->sourcesize > (int32_t)destsize) {
1473     return -1;
1474   }
1475 
1476   if (*(context->header_flags) & BLOSC_MEMCPYED) {
1477     /* Validate that compressed size is equal to decompressed size + header
1478        size. */
1479     if (context->sourcesize + BLOSC_MAX_OVERHEAD != context->compressedsize) {
1480       return -1;
1481     }
1482   } else {
1483     ntbytes = initialize_decompress_func(context, unsafe);
1484     if (ntbytes != 0) return ntbytes;
1485 
1486     /* Validate that compressed size is large enough to hold the bstarts array */
1487     if (context->nblocks > (context->compressedsize - 16) / 4) {
1488       return -1;
1489     }
1490   }
1491 
1492   /* Do the actual decompression */
1493   ntbytes = do_job(context);
1494   if (ntbytes < 0) {
1495     return -1;
1496   }
1497 
1498   assert(ntbytes <= (int32_t)destsize);
1499   return ntbytes;
1500 }
1501 
1502 /* Implementation of blosc_decompress_ctx{,_unsafe}. */
blosc_decompress_ctx_impl(const void * src,void * dest,size_t destsize,int numinternalthreads,int unsafe)1503 static int blosc_decompress_ctx_impl(const void* src, void* dest,
1504                                         size_t destsize, int numinternalthreads,
1505                                         int unsafe) {
1506   int result;
1507   struct blosc_context context;
1508 
1509   context.threads_started = 0;
1510   result = blosc_run_decompression_with_context(&context, src, dest, destsize,
1511                                                 numinternalthreads, unsafe);
1512 
1513   if (numinternalthreads > 1)
1514   {
1515     blosc_release_threadpool(&context);
1516   }
1517 
1518   return result;
1519 }
1520 
blosc_decompress_ctx(const void * src,void * dest,size_t destsize,int numinternalthreads)1521 int blosc_decompress_ctx(const void* src, void* dest, size_t destsize,
1522                          int numinternalthreads) {
1523   return blosc_decompress_ctx_impl(src, dest, destsize, numinternalthreads,
1524                                       /*unsafe=*/0);
1525 }
1526 
blosc_decompress_ctx_unsafe(const void * src,void * dest,size_t destsize,int numinternalthreads)1527 int blosc_decompress_ctx_unsafe(const void* src, void* dest, size_t destsize,
1528                                 int numinternalthreads) {
1529   return blosc_decompress_ctx_impl(src, dest, destsize, numinternalthreads,
1530                                       /*unsafe=*/1);
1531 }
1532 
1533 /* Implementation of blosc_decompress{,_unsafe}. */
blosc_decompress_impl(const void * src,void * dest,size_t destsize,int unsafe)1534 static int blosc_decompress_impl(const void* src, void* dest,
1535                                     size_t destsize, int unsafe) {
1536   int result;
1537   char* envvar;
1538   long nthreads;
1539 
1540   /* Check if should initialize */
1541   if (!g_initlib) blosc_init();
1542 
1543   /* Check for a BLOSC_NTHREADS environment variable */
1544   envvar = getenv("BLOSC_NTHREADS");
1545   if (envvar != NULL) {
1546     nthreads = strtol(envvar, NULL, 10);
1547     if ((nthreads != EINVAL) && (nthreads > 0)) {
1548       result = blosc_set_nthreads((int)nthreads);
1549       if (result < 0) { return result; }
1550     }
1551   }
1552 
1553   /* Check for a BLOSC_NOLOCK environment variable.  It is important
1554      that this should be the last env var so that it can take the
1555      previous ones into account */
1556   envvar = getenv("BLOSC_NOLOCK");
1557   if (envvar != NULL) {
1558     result = blosc_decompress_ctx(src, dest, destsize, g_threads);
1559     return result;
1560   }
1561 
1562   pthread_mutex_lock(global_comp_mutex);
1563 
1564   result = blosc_run_decompression_with_context(g_global_context, src, dest,
1565                                                 destsize, g_threads, unsafe);
1566 
1567   pthread_mutex_unlock(global_comp_mutex);
1568 
1569   return result;
1570 }
1571 
1572 /* The public routine for decompression.  See blosc.h for docstrings. */
blosc_decompress(const void * src,void * dest,size_t destsize)1573 int blosc_decompress(const void *src, void *dest, size_t destsize) {
1574   return blosc_decompress_impl(src, dest, destsize, /*unsafe=*/0);
1575 }
1576 
blosc_decompress_unsafe(const void * src,void * dest,size_t destsize)1577 int blosc_decompress_unsafe(const void *src, void *dest, size_t destsize) {
1578   return blosc_decompress_impl(src, dest, destsize, /*unsafe=*/1);
1579 }
1580 
1581 
1582 /* Implementation of blosc_getitem{,_unsafe}. */
blosc_getitem_impl(const void * src,int start,int nitems,void * dest,int unsafe)1583 static int blosc_getitem_impl(const void* src, int start, int nitems,
1584                               void* dest, int unsafe) {
1585   uint8_t *_src=NULL;               /* current pos for source buffer */
1586   uint8_t version, compversion;     /* versions for compressed header */
1587   uint8_t flags;                    /* flags for header */
1588   int32_t ntbytes = 0;              /* the number of uncompressed bytes */
1589   int32_t nblocks;                  /* number of total blocks in buffer */
1590   int32_t leftover;                 /* extra bytes at end of buffer */
1591   uint8_t *bstarts;                 /* start pointers for each block */
1592   int32_t typesize, blocksize, nbytes, compressedsize;
1593   int32_t j, bsize, bsize2, leftoverblock;
1594   int32_t cbytes, startb, stopb;
1595   int stop = start + nitems;
1596   uint8_t *tmp;
1597   uint8_t *tmp2;
1598   uint8_t *tmp3;
1599   int32_t ebsize;
1600   struct blosc_context context = {0};
1601 
1602   _src = (uint8_t *)(src);
1603 
1604   /* Read the header block */
1605   version = _src[0];                        /* blosc format version */
1606   compversion = _src[1];
1607   flags = _src[2];                          /* flags */
1608   typesize = (int32_t)_src[3];              /* typesize */
1609   nbytes = sw32_(_src + 4);                 /* buffer size */
1610   blocksize = sw32_(_src + 8);              /* block size */
1611   compressedsize = sw32_(_src + 12); /* compressed buffer size */
1612 
1613   if (version != BLOSC_VERSION_FORMAT)
1614     return -9;
1615 
1616   if (blocksize <= 0 || blocksize > nbytes || blocksize > BLOSC_MAX_BLOCKSIZE ||
1617       typesize <= 0 || typesize > BLOSC_MAX_TYPESIZE) {
1618     return -1;
1619   }
1620 
1621   /* Compute some params */
1622   /* Total blocks */
1623   nblocks = nbytes / blocksize;
1624   leftover = nbytes % blocksize;
1625   nblocks = (leftover>0)? nblocks+1: nblocks;
1626 
1627   /* Only initialize the fields blosc_d uses */
1628   context.typesize = typesize;
1629   context.header_flags = &flags;
1630   context.compversion = compversion;
1631   context.compressedsize = compressedsize;
1632   if (flags & BLOSC_MEMCPYED) {
1633     if (nbytes + BLOSC_MAX_OVERHEAD != compressedsize) {
1634       return -1;
1635     }
1636   } else {
1637     ntbytes = initialize_decompress_func(&context, /*unsafe=*/unsafe);
1638     if (ntbytes != 0) return ntbytes;
1639 
1640     if (nblocks >= (compressedsize - 16) / 4) {
1641       return -1;
1642     }
1643   }
1644 
1645   ebsize = blocksize + typesize * (int32_t)sizeof(int32_t);
1646   tmp = my_malloc(blocksize + ebsize + blocksize);
1647   tmp2 = tmp + blocksize;
1648   tmp3 = tmp + blocksize + ebsize;
1649 
1650   _src += 16;
1651   bstarts = _src;
1652   _src += sizeof(int32_t)*nblocks;
1653 
1654   /* Check region boundaries */
1655   if ((start < 0) || (start*typesize > nbytes)) {
1656     fprintf(stderr, "`start` out of bounds");
1657     return -1;
1658   }
1659 
1660   if ((stop < 0) || (stop*typesize > nbytes)) {
1661     fprintf(stderr, "`start`+`nitems` out of bounds");
1662     return -1;
1663   }
1664 
1665   for (j = 0; j < nblocks; j++) {
1666     bsize = blocksize;
1667     leftoverblock = 0;
1668     if ((j == nblocks - 1) && (leftover > 0)) {
1669       bsize = leftover;
1670       leftoverblock = 1;
1671     }
1672 
1673     /* Compute start & stop for each block */
1674     startb = start * typesize - j * blocksize;
1675     stopb = stop * typesize - j * blocksize;
1676     if ((startb >= (int)blocksize) || (stopb <= 0)) {
1677       continue;
1678     }
1679     if (startb < 0) {
1680       startb = 0;
1681     }
1682     if (stopb > (int)blocksize) {
1683       stopb = blocksize;
1684     }
1685     bsize2 = stopb - startb;
1686 
1687     /* Do the actual data copy */
1688     if (flags & BLOSC_MEMCPYED) {
1689       /* We want to memcpy only */
1690       blosc_internal_fastcopy((uint8_t *) dest + ntbytes,
1691                               (uint8_t *) src + BLOSC_MAX_OVERHEAD + j * blocksize + startb, bsize2);
1692       cbytes = bsize2;
1693     }
1694     else {
1695       /* Regular decompression.  Put results in tmp2. */
1696       cbytes = blosc_d(&context, bsize, leftoverblock,
1697                        (uint8_t *)src, sw32_(bstarts + j * 4),
1698                        tmp2, tmp, tmp3);
1699       if (cbytes < 0) {
1700         ntbytes = cbytes;
1701         break;
1702       }
1703       /* Copy to destination */
1704       blosc_internal_fastcopy((uint8_t *) dest + ntbytes, tmp2 + startb, bsize2);
1705       cbytes = bsize2;
1706     }
1707     ntbytes += cbytes;
1708   }
1709 
1710   my_free(tmp);
1711 
1712   return ntbytes;
1713 }
1714 
1715 /* Specific routine optimized for decompression a small number of
1716    items out of a compressed chunk.  This does not use threads because
1717    it would affect negatively to performance. */
blosc_getitem(const void * src,int start,int nitems,void * dest)1718 int blosc_getitem(const void *src, int start, int nitems, void *dest) {
1719   return blosc_getitem_impl(src, start, nitems, dest, /*unsafe=*/0);
1720 }
1721 
blosc_getitem_unsafe(const void * src,int start,int nitems,void * dest)1722 int blosc_getitem_unsafe(const void *src, int start, int nitems, void *dest) {
1723   return blosc_getitem_impl(src, start, nitems, dest, /*unsafe=*/1);
1724 }
1725 
1726 /* Decompress & unshuffle several blocks in a single thread */
t_blosc(void * ctxt)1727 static void *t_blosc(void *ctxt)
1728 {
1729   struct thread_context* context = (struct thread_context*)ctxt;
1730   int32_t cbytes, ntdest;
1731   int32_t tblocks;              /* number of blocks per thread */
1732   int32_t leftover2;
1733   int32_t tblock;               /* limit block on a thread */
1734   int32_t nblock_;              /* private copy of nblock */
1735   int32_t bsize, leftoverblock;
1736   /* Parameters for threads */
1737   int32_t blocksize;
1738   int32_t ebsize;
1739   int32_t compress;
1740   int32_t maxbytes;
1741   int32_t ntbytes;
1742   int32_t flags;
1743   int32_t nblocks;
1744   int32_t leftover;
1745   uint8_t *bstarts;
1746   const uint8_t *src;
1747   uint8_t *dest;
1748   uint8_t *tmp;
1749   uint8_t *tmp2;
1750   uint8_t *tmp3;
1751   int rc;
1752   (void)rc;  // just to avoid 'unused-variable' warning
1753 
1754   while(1)
1755   {
1756     /* Synchronization point for all threads (wait for initialization) */
1757     WAIT_INIT(NULL, context->parent_context);
1758 
1759     if(context->parent_context->end_threads)
1760     {
1761       break;
1762     }
1763 
1764     /* Get parameters for this thread before entering the main loop */
1765     blocksize = context->parent_context->blocksize;
1766     ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t);
1767     compress = context->parent_context->compress;
1768     flags = *(context->parent_context->header_flags);
1769     maxbytes = context->parent_context->destsize;
1770     nblocks = context->parent_context->nblocks;
1771     leftover = context->parent_context->leftover;
1772     bstarts = context->parent_context->bstarts;
1773     src = context->parent_context->src;
1774     dest = context->parent_context->dest;
1775 
1776     if (blocksize > context->tmpblocksize)
1777     {
1778       my_free(context->tmp);
1779       context->tmp = my_malloc(blocksize + ebsize + blocksize);
1780       context->tmp2 = context->tmp + blocksize;
1781       context->tmp3 = context->tmp + blocksize + ebsize;
1782     }
1783 
1784     tmp = context->tmp;
1785     tmp2 = context->tmp2;
1786     tmp3 = context->tmp3;
1787 
1788     ntbytes = 0;                /* only useful for decompression */
1789 
1790     if (compress && !(flags & BLOSC_MEMCPYED)) {
1791       /* Compression always has to follow the block order */
1792       pthread_mutex_lock(&context->parent_context->count_mutex);
1793       context->parent_context->thread_nblock++;
1794       nblock_ = context->parent_context->thread_nblock;
1795       pthread_mutex_unlock(&context->parent_context->count_mutex);
1796       tblock = nblocks;
1797     }
1798     else {
1799       /* Decompression can happen using any order.  We choose
1800        sequential block order on each thread */
1801 
1802       /* Blocks per thread */
1803       tblocks = nblocks / context->parent_context->numthreads;
1804       leftover2 = nblocks % context->parent_context->numthreads;
1805       tblocks = (leftover2>0)? tblocks+1: tblocks;
1806 
1807       nblock_ = context->tid*tblocks;
1808       tblock = nblock_ + tblocks;
1809       if (tblock > nblocks) {
1810         tblock = nblocks;
1811       }
1812     }
1813 
1814     /* Loop over blocks */
1815     leftoverblock = 0;
1816     while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) {
1817       bsize = blocksize;
1818       if (nblock_ == (nblocks - 1) && (leftover > 0)) {
1819         bsize = leftover;
1820         leftoverblock = 1;
1821       }
1822       if (compress) {
1823         if (flags & BLOSC_MEMCPYED) {
1824           /* We want to memcpy only */
1825           blosc_internal_fastcopy(dest + BLOSC_MAX_OVERHEAD + nblock_ * blocksize, src + nblock_ * blocksize,
1826                                   bsize);
1827           cbytes = bsize;
1828         }
1829         else {
1830           /* Regular compression */
1831           cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize,
1832                            src+nblock_*blocksize, tmp2, tmp, tmp3);
1833         }
1834       }
1835       else {
1836         if (flags & BLOSC_MEMCPYED) {
1837           /* We want to memcpy only */
1838           blosc_internal_fastcopy(dest + nblock_ * blocksize, src + BLOSC_MAX_OVERHEAD + nblock_ * blocksize,
1839                                   bsize);
1840           cbytes = bsize;
1841         }
1842         else {
1843           cbytes = blosc_d(context->parent_context, bsize, leftoverblock,
1844                            src, sw32_(bstarts + nblock_ * 4),
1845                            dest+nblock_*blocksize,
1846                            tmp, tmp2);
1847         }
1848       }
1849 
1850       /* Check whether current thread has to giveup */
1851       if (context->parent_context->thread_giveup_code <= 0) {
1852         break;
1853       }
1854 
1855       /* Check results for the compressed/decompressed block */
1856       if (cbytes < 0) {            /* compr/decompr failure */
1857         /* Set giveup_code error */
1858         pthread_mutex_lock(&context->parent_context->count_mutex);
1859         context->parent_context->thread_giveup_code = cbytes;
1860         pthread_mutex_unlock(&context->parent_context->count_mutex);
1861         break;
1862       }
1863 
1864       if (compress && !(flags & BLOSC_MEMCPYED)) {
1865         /* Start critical section */
1866         pthread_mutex_lock(&context->parent_context->count_mutex);
1867         ntdest = context->parent_context->num_output_bytes;
1868         _sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */
1869         if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) {
1870           context->parent_context->thread_giveup_code = 0;  /* uncompressible buffer */
1871           pthread_mutex_unlock(&context->parent_context->count_mutex);
1872           break;
1873         }
1874         context->parent_context->thread_nblock++;
1875         nblock_ = context->parent_context->thread_nblock;
1876         context->parent_context->num_output_bytes += cbytes;           /* update return bytes counter */
1877         pthread_mutex_unlock(&context->parent_context->count_mutex);
1878         /* End of critical section */
1879 
1880         /* Copy the compressed buffer to destination */
1881         blosc_internal_fastcopy(dest + ntdest, tmp2, cbytes);
1882       }
1883       else {
1884         nblock_++;
1885         /* Update counter for this thread */
1886         ntbytes += cbytes;
1887       }
1888 
1889     } /* closes while (nblock_) */
1890 
1891     /* Sum up all the bytes decompressed */
1892     if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) {
1893       /* Update global counter for all threads (decompression only) */
1894       pthread_mutex_lock(&context->parent_context->count_mutex);
1895       context->parent_context->num_output_bytes += ntbytes;
1896       pthread_mutex_unlock(&context->parent_context->count_mutex);
1897     }
1898 
1899     /* Meeting point for all threads (wait for finalization) */
1900     WAIT_FINISH(NULL, context->parent_context);
1901   }
1902 
1903   /* Cleanup our working space and context */
1904   my_free(context->tmp);
1905   my_free(context);
1906 
1907   return(NULL);
1908 }
1909 
1910 
init_threads(struct blosc_context * context)1911 static int init_threads(struct blosc_context* context)
1912 {
1913   int32_t tid;
1914   int rc2;
1915   int32_t ebsize;
1916   struct thread_context* thread_context;
1917 
1918   /* Initialize mutex and condition variable objects */
1919   pthread_mutex_init(&context->count_mutex, NULL);
1920 
1921   /* Set context thread sentinels */
1922   context->thread_giveup_code = 1;
1923   context->thread_nblock = -1;
1924 
1925   /* Barrier initialization */
1926 #ifdef _POSIX_BARRIERS_MINE
1927   pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1);
1928   pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1);
1929 #else
1930   pthread_mutex_init(&context->count_threads_mutex, NULL);
1931   pthread_cond_init(&context->count_threads_cv, NULL);
1932   context->count_threads = 0;      /* Reset threads counter */
1933 #endif
1934 
1935 #if !defined(_WIN32)
1936   /* Initialize and set thread detached attribute */
1937   pthread_attr_init(&context->ct_attr);
1938   pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
1939 #endif
1940 
1941   /* Finally, create the threads in detached state */
1942   for (tid = 0; tid < context->numthreads; tid++) {
1943     context->tids[tid] = tid;
1944 
1945     /* Create a thread context thread owns context (will destroy when finished) */
1946     thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
1947     thread_context->parent_context = context;
1948     thread_context->tid = tid;
1949 
1950     ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
1951     thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize);
1952     thread_context->tmp2 = thread_context->tmp + context->blocksize;
1953     thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize;
1954     thread_context->tmpblocksize = context->blocksize;
1955 
1956 #if !defined(_WIN32)
1957     rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context);
1958 #else
1959     rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context);
1960 #endif
1961     if (rc2) {
1962       fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc2);
1963       fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
1964       return(-1);
1965     }
1966   }
1967 
1968 
1969   return(0);
1970 }
1971 
blosc_get_nthreads(void)1972 int blosc_get_nthreads(void)
1973 {
1974   int ret = g_threads;
1975 
1976   return ret;
1977 }
1978 
blosc_set_nthreads(int nthreads_new)1979 int blosc_set_nthreads(int nthreads_new)
1980 {
1981   int ret = g_threads;
1982 
1983   /* Check if should initialize */
1984   if (!g_initlib) blosc_init();
1985 
1986   if (nthreads_new != ret){
1987     /* Re-initialize Blosc */
1988     blosc_destroy();
1989     blosc_init();
1990     g_threads = nthreads_new;
1991   }
1992 
1993   return ret;
1994 }
1995 
blosc_set_nthreads_(struct blosc_context * context)1996 int blosc_set_nthreads_(struct blosc_context* context)
1997 {
1998   if (context->numthreads > BLOSC_MAX_THREADS) {
1999     fprintf(stderr,
2000             "Error.  nthreads cannot be larger than BLOSC_MAX_THREADS (%d)",
2001             BLOSC_MAX_THREADS);
2002     return -1;
2003   }
2004   else if (context->numthreads <= 0) {
2005     fprintf(stderr, "Error.  nthreads must be a positive integer");
2006     return -1;
2007   }
2008 
2009   /* Launch a new pool of threads */
2010   if (context->numthreads > 1 && context->numthreads != context->threads_started) {
2011     blosc_release_threadpool(context);
2012     init_threads(context);
2013   }
2014 
2015   /* We have now started the threads */
2016   context->threads_started = context->numthreads;
2017 
2018   return context->numthreads;
2019 }
2020 
blosc_get_compressor(void)2021 const char* blosc_get_compressor(void)
2022 {
2023   const char* compname;
2024   blosc_compcode_to_compname(g_compressor, &compname);
2025 
2026   return compname;
2027 }
2028 
blosc_set_compressor(const char * compname)2029 int blosc_set_compressor(const char *compname)
2030 {
2031   int code = blosc_compname_to_compcode(compname);
2032 
2033   g_compressor = code;
2034 
2035   /* Check if should initialize */
2036   if (!g_initlib) blosc_init();
2037 
2038   return code;
2039 }
2040 
blosc_list_compressors(void)2041 const char* blosc_list_compressors(void)
2042 {
2043   static int compressors_list_done = 0;
2044   static char ret[256];
2045 
2046   if (compressors_list_done) return ret;
2047   ret[0] = '\0';
2048   strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
2049 #if defined(HAVE_LZ4)
2050   strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME);
2051   strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME);
2052 #endif /* HAVE_LZ4 */
2053 #if defined(HAVE_SNAPPY)
2054   strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME);
2055 #endif /* HAVE_SNAPPY */
2056 #if defined(HAVE_ZLIB)
2057   strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME);
2058 #endif /* HAVE_ZLIB */
2059 #if defined(HAVE_ZSTD)
2060   strcat(ret, ","); strcat(ret, BLOSC_ZSTD_COMPNAME);
2061 #endif /* HAVE_ZSTD */
2062   compressors_list_done = 1;
2063   return ret;
2064 }
2065 
blosc_get_version_string(void)2066 const char* blosc_get_version_string(void)
2067 {
2068   return BLOSC_VERSION_STRING;
2069 }
2070 
blosc_get_complib_info(const char * compname,char ** complib,char ** version)2071 int blosc_get_complib_info(const char *compname, char **complib, char **version)
2072 {
2073   int clibcode;
2074   const char *clibname;
2075   const char *clibversion = "unknown";
2076 
2077 #if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION)) || defined(ZSTD_VERSION_MAJOR)
2078   char sbuffer[256];
2079 #endif
2080 
2081   clibcode = compname_to_clibcode(compname);
2082   clibname = clibcode_to_clibname(clibcode);
2083 
2084   /* complib version */
2085   if (clibcode == BLOSC_BLOSCLZ_LIB) {
2086     clibversion = BLOSCLZ_VERSION_STRING;
2087   }
2088 #if defined(HAVE_LZ4)
2089   else if (clibcode == BLOSC_LZ4_LIB) {
2090 #if defined(LZ4_VERSION_MAJOR)
2091     sprintf(sbuffer, "%d.%d.%d",
2092             LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
2093     clibversion = sbuffer;
2094 #endif /* LZ4_VERSION_MAJOR */
2095   }
2096 #endif /* HAVE_LZ4 */
2097 #if defined(HAVE_SNAPPY)
2098   else if (clibcode == BLOSC_SNAPPY_LIB) {
2099 #if defined(SNAPPY_VERSION)
2100     sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL);
2101     clibversion = sbuffer;
2102 #endif /* SNAPPY_VERSION */
2103   }
2104 #endif /* HAVE_SNAPPY */
2105 #if defined(HAVE_ZLIB)
2106   else if (clibcode == BLOSC_ZLIB_LIB) {
2107     clibversion = ZLIB_VERSION;
2108   }
2109 #endif /* HAVE_ZLIB */
2110 #if defined(HAVE_ZSTD)
2111   else if (clibcode == BLOSC_ZSTD_LIB) {
2112     sprintf(sbuffer, "%d.%d.%d",
2113             ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
2114     clibversion = sbuffer;
2115   }
2116 #endif /* HAVE_ZSTD */
2117   else {
2118     /* Unsupported library */
2119     if (complib != NULL) *complib = NULL;
2120     if (version != NULL) *version = NULL;
2121     return -1;
2122   }
2123 
2124   if (complib != NULL) *complib = strdup(clibname);
2125   if (version != NULL) *version = strdup(clibversion);
2126 
2127   return clibcode;
2128 }
2129 
2130 /* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
blosc_cbuffer_sizes(const void * cbuffer,size_t * nbytes,size_t * cbytes,size_t * blocksize)2131 void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes,
2132                          size_t *cbytes, size_t *blocksize)
2133 {
2134   uint8_t *_src = (uint8_t *)(cbuffer);    /* current pos for source buffer */
2135   uint8_t version = _src[0];               /* version of header */
2136 
2137   if (version != BLOSC_VERSION_FORMAT) {
2138     *nbytes = *blocksize = *cbytes = 0;
2139     return;
2140   }
2141 
2142   /* Read the interesting values */
2143   *nbytes = (size_t)sw32_(_src + 4);       /* uncompressed buffer size */
2144   *blocksize = (size_t)sw32_(_src + 8);    /* block size */
2145   *cbytes = (size_t)sw32_(_src + 12);      /* compressed buffer size */
2146 }
2147 
blosc_cbuffer_validate(const void * cbuffer,size_t cbytes,size_t * nbytes)2148 int blosc_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
2149   size_t header_cbytes, header_blocksize;
2150   if (cbytes < BLOSC_MIN_HEADER_LENGTH) return -1;
2151   blosc_cbuffer_sizes(cbuffer, nbytes, &header_cbytes, &header_blocksize);
2152   if (header_cbytes != cbytes) return -1;
2153   if (*nbytes > BLOSC_MAX_BUFFERSIZE) return -1;
2154   return 0;
2155 }
2156 
2157 /* Return `typesize` and `flags` from a compressed buffer. */
blosc_cbuffer_metainfo(const void * cbuffer,size_t * typesize,int * flags)2158 void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize,
2159                             int *flags)
2160 {
2161   uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2162 
2163   uint8_t version = _src[0];               /* version of header */
2164 
2165   if (version != BLOSC_VERSION_FORMAT) {
2166     *flags = *typesize = 0;
2167     return;
2168   }
2169 
2170   /* Read the interesting values */
2171   *flags = (int)_src[2] & 7;             /* first three flags */
2172   *typesize = (size_t)_src[3];           /* typesize */
2173 }
2174 
2175 
2176 /* Return version information from a compressed buffer. */
blosc_cbuffer_versions(const void * cbuffer,int * version,int * versionlz)2177 void blosc_cbuffer_versions(const void *cbuffer, int *version,
2178                             int *versionlz)
2179 {
2180   uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2181 
2182   /* Read the version info */
2183   *version = (int)_src[0];         /* blosc format version */
2184   *versionlz = (int)_src[1];       /* Lempel-Ziv compressor format version */
2185 }
2186 
2187 
2188 /* Return the compressor library/format used in a compressed buffer. */
blosc_cbuffer_complib(const void * cbuffer)2189 const char *blosc_cbuffer_complib(const void *cbuffer)
2190 {
2191   uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2192   int clibcode;
2193   const char *complib;
2194 
2195   /* Read the compressor format/library info */
2196   clibcode = (_src[2] & 0xe0) >> 5;
2197   complib = clibcode_to_clibname(clibcode);
2198   return complib;
2199 }
2200 
2201 /* Get the internal blocksize to be used during compression.  0 means
2202    that an automatic blocksize is computed internally. */
blosc_get_blocksize(void)2203 int blosc_get_blocksize(void)
2204 {
2205   return (int)g_force_blocksize;
2206 }
2207 
2208 /* Force the use of a specific blocksize.  If 0, an automatic
2209    blocksize will be used (the default). */
blosc_set_blocksize(size_t size)2210 void blosc_set_blocksize(size_t size)
2211 {
2212   g_force_blocksize = (int32_t)size;
2213 }
2214 
2215 /* Force the use of a specific split mode. */
blosc_set_splitmode(int mode)2216 void blosc_set_splitmode(int mode)
2217 {
2218   g_splitmode = mode;
2219 }
2220 
2221 /* Child global context is invalid and pool threads no longer exist post-fork.
2222  * Discard the old, inconsistent global context and global context mutex and
2223  * mark as uninitialized.  Subsequent calls through `blosc_*` interfaces will
2224  * trigger re-init of the global context.
2225  *
2226  * All pthread interfaces have undefined behavior in child handler in current
2227  * posix standards: http://pubs.opengroup.org/onlinepubs/9699919799/
2228  */
blosc_atfork_child(void)2229 void blosc_atfork_child(void) {
2230   if (!g_initlib) return;
2231 
2232   g_initlib = 0;
2233 
2234   my_free(global_comp_mutex);
2235   global_comp_mutex = NULL;
2236 
2237   my_free(g_global_context);
2238   g_global_context = NULL;
2239 
2240 }
2241 
blosc_init(void)2242 void blosc_init(void)
2243 {
2244   /* Return if we are already initialized */
2245   if (g_initlib) return;
2246 
2247   global_comp_mutex = (pthread_mutex_t*)my_malloc(sizeof(pthread_mutex_t));
2248   pthread_mutex_init(global_comp_mutex, NULL);
2249 
2250   g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context));
2251   g_global_context->threads_started = 0;
2252 
2253   #if !defined(_WIN32)
2254   /* atfork handlers are only be registered once, though multiple re-inits may
2255    * occur via blosc_destroy/blosc_init.  */
2256   if (!g_atfork_registered) {
2257     g_atfork_registered = 1;
2258     pthread_atfork(NULL, NULL, &blosc_atfork_child);
2259   }
2260   #endif
2261 
2262   g_initlib = 1;
2263 }
2264 
blosc_destroy(void)2265 void blosc_destroy(void)
2266 {
2267   /* Return if Blosc is not initialized */
2268   if (!g_initlib) return;
2269 
2270   g_initlib = 0;
2271 
2272   blosc_release_threadpool(g_global_context);
2273   my_free(g_global_context);
2274   g_global_context = NULL;
2275 
2276   pthread_mutex_destroy(global_comp_mutex);
2277   my_free(global_comp_mutex);
2278   global_comp_mutex = NULL;
2279 }
2280 
blosc_release_threadpool(struct blosc_context * context)2281 int blosc_release_threadpool(struct blosc_context* context)
2282 {
2283   int32_t t;
2284   void* status;
2285   int rc;
2286   int rc2;
2287   (void)rc;  // just to avoid 'unused-variable' warning
2288 
2289   if (context->threads_started > 0)
2290   {
2291     /* Tell all existing threads to finish */
2292     context->end_threads = 1;
2293 
2294     /* Sync threads */
2295     WAIT_INIT(-1, context);
2296 
2297     /* Join exiting threads */
2298     for (t=0; t<context->threads_started; t++) {
2299       rc2 = pthread_join(context->threads[t], &status);
2300       if (rc2) {
2301         fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
2302         fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
2303       }
2304     }
2305 
2306     /* Release mutex and condition variable objects */
2307     pthread_mutex_destroy(&context->count_mutex);
2308 
2309     /* Barriers */
2310   #ifdef _POSIX_BARRIERS_MINE
2311       pthread_barrier_destroy(&context->barr_init);
2312       pthread_barrier_destroy(&context->barr_finish);
2313   #else
2314       pthread_mutex_destroy(&context->count_threads_mutex);
2315       pthread_cond_destroy(&context->count_threads_cv);
2316   #endif
2317 
2318       /* Thread attributes */
2319   #if !defined(_WIN32)
2320       pthread_attr_destroy(&context->ct_attr);
2321   #endif
2322 
2323   }
2324 
2325   context->threads_started = 0;
2326 
2327   return 0;
2328 }
2329 
blosc_free_resources(void)2330 int blosc_free_resources(void)
2331 {
2332   /* Return if Blosc is not initialized */
2333   if (!g_initlib) return -1;
2334 
2335   return blosc_release_threadpool(g_global_context);
2336 }
2337