1 /*********************************************************************
2 Blosc - Blocked Shuffling and Compression Library
3
4 Author: Francesc Alted <francesc@blosc.org>
5 Creation date: 2009-05-20
6
7 See LICENSES/BLOSC.txt for details about copyright and rights to use.
8 **********************************************************************/
9
10
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <errno.h>
14 #include <string.h>
15 #include <sys/types.h>
16 #include <assert.h>
17
18 #include "fastcopy.h"
19
20 #if defined(USING_CMAKE)
21 #include "config.h"
22 #endif /* USING_CMAKE */
23 #include "blosc.h"
24 #include "shuffle.h"
25 #include "blosclz.h"
26 #if defined(HAVE_LZ4)
27 #include "lz4.h"
28 #include "lz4hc.h"
29 #endif /* HAVE_LZ4 */
30 #if defined(HAVE_SNAPPY)
31 #include "snappy-c.h"
32 #endif /* HAVE_SNAPPY */
33 #if defined(HAVE_ZLIB)
34 #include "zlib.h"
35 #endif /* HAVE_ZLIB */
36 #if defined(HAVE_ZSTD)
37 #include "zstd.h"
38 #endif /* HAVE_ZSTD */
39
40 #if defined(_WIN32) && !defined(__MINGW32__)
41 #include <windows.h>
42 #include <malloc.h>
43
44 /* stdint.h only available in VS2010 (VC++ 16.0) and newer */
45 #if defined(_MSC_VER) && _MSC_VER < 1600
46 #include "win32/stdint-windows.h"
47 #else
48 #include <stdint.h>
49 #endif
50
51 #include <process.h>
52 #define getpid _getpid
53 #else
54 #include <stdint.h>
55 #include <unistd.h>
56 #include <inttypes.h>
57 #endif /* _WIN32 */
58
59 /* Include the win32/pthread.h library for all the Windows builds. See #224. */
60 #if defined(_WIN32)
61 #include "win32/pthread.h"
62 #include "win32/pthread.c"
63 #else
64 #include <pthread.h>
65 #endif
66
67
68 /* Some useful units */
69 #define KB 1024
70 #define MB (1024 * (KB))
71
72 /* Minimum buffer size to be compressed */
73 #define MIN_BUFFERSIZE 128 /* Cannot be smaller than 66 */
74
75 /* The maximum number of splits in a block for compression */
76 #define MAX_SPLITS 16 /* Cannot be larger than 128 */
77
78 /* The size of L1 cache. 32 KB is quite common nowadays. */
79 #define L1 (32 * (KB))
80
81 /* Have problems using posix barriers when symbol value is 200112L */
82 /* This requires more investigation, but will work for the moment */
83 #if defined(_POSIX_BARRIERS) && ( (_POSIX_BARRIERS - 20012L) >= 0 && _POSIX_BARRIERS != 200112L)
84 #define _POSIX_BARRIERS_MINE
85 #endif
86 /* Synchronization variables */
87
88
89 struct blosc_context {
90 int32_t compress; /* 1 if we are doing compression 0 if decompress */
91
92 const uint8_t* src;
93 uint8_t* dest; /* The current pos in the destination buffer */
94 uint8_t* header_flags; /* Flags for header */
95 int compversion; /* Compressor version byte, only used during decompression */
96 int32_t sourcesize; /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */
97 int32_t compressedsize; /* Number of bytes of compressed data (only used when decompressing) */
98 int32_t nblocks; /* Number of total blocks in buffer */
99 int32_t leftover; /* Extra bytes at end of buffer */
100 int32_t blocksize; /* Length of the block in bytes */
101 int32_t typesize; /* Type size */
102 int32_t num_output_bytes; /* Counter for the number of output bytes */
103 int32_t destsize; /* Maximum size for destination buffer */
104 uint8_t* bstarts; /* Start of the buffer past header info */
105 int32_t compcode; /* Compressor code to use */
106 int clevel; /* Compression level (1-9) */
107 /* Function to use for decompression. Only used when decompression */
108 int (*decompress_func)(const void* input, int compressed_length, void* output,
109 int maxout);
110
111 /* Threading */
112 int32_t numthreads;
113 int32_t threads_started;
114 int32_t end_threads;
115 pthread_t threads[BLOSC_MAX_THREADS];
116 int32_t tids[BLOSC_MAX_THREADS];
117 pthread_mutex_t count_mutex;
118 #ifdef _POSIX_BARRIERS_MINE
119 pthread_barrier_t barr_init;
120 pthread_barrier_t barr_finish;
121 #else
122 int32_t count_threads;
123 pthread_mutex_t count_threads_mutex;
124 pthread_cond_t count_threads_cv;
125 #endif
126 #if !defined(_WIN32)
127 pthread_attr_t ct_attr; /* creation time attrs for threads */
128 #endif
129 int32_t thread_giveup_code; /* error code when give up */
130 int32_t thread_nblock; /* block counter */
131 };
132
133 struct thread_context {
134 struct blosc_context* parent_context;
135 int32_t tid;
136 uint8_t* tmp;
137 uint8_t* tmp2;
138 uint8_t* tmp3;
139 int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */
140 };
141
142 /* Global context for non-contextual API */
143 static struct blosc_context* g_global_context;
144 static pthread_mutex_t* global_comp_mutex;
145 static int32_t g_compressor = BLOSC_BLOSCLZ; /* the compressor to use by default */
146 static int32_t g_threads = 1;
147 static int32_t g_force_blocksize = 0;
148 static int32_t g_initlib = 0;
149 static int32_t g_atfork_registered = 0;
150 static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
151
152
153
154 /* Wrapped function to adjust the number of threads used by blosc */
155 int blosc_set_nthreads_(struct blosc_context*);
156
157 /* Releases the global threadpool */
158 int blosc_release_threadpool(struct blosc_context* context);
159
160 /* Macros for synchronization */
161
162 /* Wait until all threads are initialized */
163 #ifdef _POSIX_BARRIERS_MINE
164 #define WAIT_INIT(RET_VAL, CONTEXT_PTR) \
165 rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \
166 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
167 printf("Could not wait on barrier (init): %d\n", rc); \
168 return((RET_VAL)); \
169 }
170 #else
171 #define WAIT_INIT(RET_VAL, CONTEXT_PTR) \
172 pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
173 if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \
174 CONTEXT_PTR->count_threads++; \
175 pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
176 } \
177 else { \
178 pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
179 } \
180 pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
181 #endif
182
183 /* Wait for all threads to finish */
184 #ifdef _POSIX_BARRIERS_MINE
185 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \
186 rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \
187 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
188 printf("Could not wait on barrier (finish)\n"); \
189 return((RET_VAL)); \
190 }
191 #else
192 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \
193 pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
194 if (CONTEXT_PTR->count_threads > 0) { \
195 CONTEXT_PTR->count_threads--; \
196 pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
197 } \
198 else { \
199 pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
200 } \
201 pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
202 #endif
203
204
205 /* A function for aligned malloc that is portable */
my_malloc(size_t size)206 static uint8_t *my_malloc(size_t size)
207 {
208 void *block = NULL;
209 int res = 0;
210
211 /* Do an alignment to 32 bytes because AVX2 is supported */
212 #if defined(_WIN32)
213 /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
214 block = (void *)_aligned_malloc(size, 32);
215 #elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
216 /* Platform does have an implementation of posix_memalign */
217 res = posix_memalign(&block, 32, size);
218 #else
219 block = malloc(size);
220 #endif /* _WIN32 */
221
222 if (block == NULL || res != 0) {
223 printf("Error allocating memory!");
224 return NULL;
225 }
226
227 return (uint8_t *)block;
228 }
229
230
231 /* Release memory booked by my_malloc */
my_free(void * block)232 static void my_free(void *block)
233 {
234 #if defined(_WIN32)
235 _aligned_free(block);
236 #else
237 free(block);
238 #endif /* _WIN32 */
239 }
240
241
242 /* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */
sw32_(const uint8_t * pa)243 static int32_t sw32_(const uint8_t *pa)
244 {
245 int32_t idest;
246 uint8_t *dest = (uint8_t *)&idest;
247 int i = 1; /* for big/little endian detection */
248 char *p = (char *)&i;
249
250 if (p[0] != 1) {
251 /* big endian */
252 dest[0] = pa[3];
253 dest[1] = pa[2];
254 dest[2] = pa[1];
255 dest[3] = pa[0];
256 }
257 else {
258 /* little endian */
259 dest[0] = pa[0];
260 dest[1] = pa[1];
261 dest[2] = pa[2];
262 dest[3] = pa[3];
263 }
264 return idest;
265 }
266
267
268 /* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */
_sw32(uint8_t * dest,int32_t a)269 static void _sw32(uint8_t* dest, int32_t a)
270 {
271 uint8_t *pa = (uint8_t *)&a;
272 int i = 1; /* for big/little endian detection */
273 char *p = (char *)&i;
274
275 if (p[0] != 1) {
276 /* big endian */
277 dest[0] = pa[3];
278 dest[1] = pa[2];
279 dest[2] = pa[1];
280 dest[3] = pa[0];
281 }
282 else {
283 /* little endian */
284 dest[0] = pa[0];
285 dest[1] = pa[1];
286 dest[2] = pa[2];
287 dest[3] = pa[3];
288 }
289 }
290
291 /*
292 * Conversion routines between compressor and compression libraries
293 */
294
295 /* Return the library code associated with the compressor name */
compname_to_clibcode(const char * compname)296 static int compname_to_clibcode(const char *compname)
297 {
298 if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
299 return BLOSC_BLOSCLZ_LIB;
300 if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
301 return BLOSC_LZ4_LIB;
302 if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
303 return BLOSC_LZ4_LIB;
304 if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0)
305 return BLOSC_SNAPPY_LIB;
306 if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
307 return BLOSC_ZLIB_LIB;
308 if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
309 return BLOSC_ZSTD_LIB;
310 return -1;
311 }
312
313 /* Return the library name associated with the compressor code */
clibcode_to_clibname(int clibcode)314 static const char *clibcode_to_clibname(int clibcode)
315 {
316 if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
317 if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
318 if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME;
319 if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
320 if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
321 return NULL; /* should never happen */
322 }
323
324
325 /*
326 * Conversion routines between compressor names and compressor codes
327 */
328
329 /* Get the compressor name associated with the compressor code */
blosc_compcode_to_compname(int compcode,const char ** compname)330 int blosc_compcode_to_compname(int compcode, const char **compname)
331 {
332 int code = -1; /* -1 means non-existent compressor code */
333 const char *name = NULL;
334
335 /* Map the compressor code */
336 if (compcode == BLOSC_BLOSCLZ)
337 name = BLOSC_BLOSCLZ_COMPNAME;
338 else if (compcode == BLOSC_LZ4)
339 name = BLOSC_LZ4_COMPNAME;
340 else if (compcode == BLOSC_LZ4HC)
341 name = BLOSC_LZ4HC_COMPNAME;
342 else if (compcode == BLOSC_SNAPPY)
343 name = BLOSC_SNAPPY_COMPNAME;
344 else if (compcode == BLOSC_ZLIB)
345 name = BLOSC_ZLIB_COMPNAME;
346 else if (compcode == BLOSC_ZSTD)
347 name = BLOSC_ZSTD_COMPNAME;
348
349 *compname = name;
350
351 /* Guess if there is support for this code */
352 if (compcode == BLOSC_BLOSCLZ)
353 code = BLOSC_BLOSCLZ;
354 #if defined(HAVE_LZ4)
355 else if (compcode == BLOSC_LZ4)
356 code = BLOSC_LZ4;
357 else if (compcode == BLOSC_LZ4HC)
358 code = BLOSC_LZ4HC;
359 #endif /* HAVE_LZ4 */
360 #if defined(HAVE_SNAPPY)
361 else if (compcode == BLOSC_SNAPPY)
362 code = BLOSC_SNAPPY;
363 #endif /* HAVE_SNAPPY */
364 #if defined(HAVE_ZLIB)
365 else if (compcode == BLOSC_ZLIB)
366 code = BLOSC_ZLIB;
367 #endif /* HAVE_ZLIB */
368 #if defined(HAVE_ZSTD)
369 else if (compcode == BLOSC_ZSTD)
370 code = BLOSC_ZSTD;
371 #endif /* HAVE_ZSTD */
372
373 return code;
374 }
375
376 /* Get the compressor code for the compressor name. -1 if it is not available */
blosc_compname_to_compcode(const char * compname)377 int blosc_compname_to_compcode(const char *compname)
378 {
379 int code = -1; /* -1 means non-existent compressor code */
380
381 if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
382 code = BLOSC_BLOSCLZ;
383 }
384 #if defined(HAVE_LZ4)
385 else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
386 code = BLOSC_LZ4;
387 }
388 else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
389 code = BLOSC_LZ4HC;
390 }
391 #endif /* HAVE_LZ4 */
392 #if defined(HAVE_SNAPPY)
393 else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) {
394 code = BLOSC_SNAPPY;
395 }
396 #endif /* HAVE_SNAPPY */
397 #if defined(HAVE_ZLIB)
398 else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
399 code = BLOSC_ZLIB;
400 }
401 #endif /* HAVE_ZLIB */
402 #if defined(HAVE_ZSTD)
403 else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
404 code = BLOSC_ZSTD;
405 }
406 #endif /* HAVE_ZSTD */
407
408 return code;
409 }
410
411
412 #if defined(HAVE_LZ4)
lz4_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int accel)413 static int lz4_wrap_compress(const char* input, size_t input_length,
414 char* output, size_t maxout, int accel)
415 {
416 int cbytes;
417 cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout,
418 accel);
419 return cbytes;
420 }
421
lz4hc_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)422 static int lz4hc_wrap_compress(const char* input, size_t input_length,
423 char* output, size_t maxout, int clevel)
424 {
425 int cbytes;
426 if (input_length > (size_t)(UINT32_C(2)<<30))
427 return -1; /* input larger than 2 GB is not supported */
428 /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
429 * but levels larger than 9 do not buy much compression. */
430 cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
431 clevel);
432 return cbytes;
433 }
434
lz4_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)435 static int lz4_wrap_decompress(const void* input, int compressed_length,
436 void* output, int maxout)
437 {
438 return LZ4_decompress_safe(input, output, compressed_length, maxout);
439 }
440
441 #endif /* HAVE_LZ4 */
442
443 #if defined(HAVE_SNAPPY)
snappy_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout)444 static int snappy_wrap_compress(const char* input, size_t input_length,
445 char* output, size_t maxout)
446 {
447 snappy_status status;
448 size_t cl = maxout;
449 status = snappy_compress(input, input_length, output, &cl);
450 if (status != SNAPPY_OK){
451 return 0;
452 }
453 return (int)cl;
454 }
455
snappy_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)456 static int snappy_wrap_decompress(const void* input, int compressed_length,
457 void* output, int maxout)
458 {
459 snappy_status status;
460 size_t ul = maxout;
461 status = snappy_uncompress(input, compressed_length, output, &ul);
462 if (status != SNAPPY_OK){
463 return 0;
464 }
465 return (int)ul;
466 }
467 #endif /* HAVE_SNAPPY */
468
469 #if defined(HAVE_ZLIB)
470 /* zlib is not very respectful with sharing name space with others.
471 Fortunately, its names do not collide with those already in blosc. */
zlib_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)472 static int zlib_wrap_compress(const char* input, size_t input_length,
473 char* output, size_t maxout, int clevel)
474 {
475 int status;
476 uLongf cl = maxout;
477 status = compress2(
478 (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
479 if (status != Z_OK){
480 return 0;
481 }
482 return (int)cl;
483 }
484
zlib_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)485 static int zlib_wrap_decompress(const void* input, int compressed_length,
486 void* output, int maxout) {
487 int status;
488 uLongf ul = maxout;
489 status = uncompress(
490 (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
491 if (status != Z_OK){
492 return 0;
493 }
494 return (int)ul;
495 }
496 #endif /* HAVE_ZLIB */
497
498 #if defined(HAVE_ZSTD)
zstd_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)499 static int zstd_wrap_compress(const char* input, size_t input_length,
500 char* output, size_t maxout, int clevel) {
501 size_t code;
502 clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
503 /* Make the level 8 close enough to maxCLevel */
504 if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
505 code = ZSTD_compress(
506 (void*)output, maxout, (void*)input, input_length, clevel);
507 if (ZSTD_isError(code)) {
508 return 0;
509 }
510 return (int)code;
511 }
512
zstd_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)513 static int zstd_wrap_decompress(const void* input, int compressed_length,
514 void* output, int maxout) {
515 size_t code;
516 code = ZSTD_decompress(
517 (void*)output, maxout, (void*)input, compressed_length);
518 if (ZSTD_isError(code)) {
519 return 0;
520 }
521 return (int)code;
522 }
523 #endif /* HAVE_ZSTD */
524
initialize_decompress_func(struct blosc_context * context)525 static int initialize_decompress_func(struct blosc_context* context) {
526 int8_t header_flags = *(context->header_flags);
527 int32_t compformat = (header_flags & 0xe0) >> 5;
528 int compversion = context->compversion;
529
530 if (compformat == BLOSC_BLOSCLZ_FORMAT) {
531 if (compversion != BLOSC_BLOSCLZ_VERSION_FORMAT) {
532 return -9;
533 }
534 context->decompress_func = &blosclz_decompress;
535 return 0;
536 }
537 #if defined(HAVE_LZ4)
538 if (compformat == BLOSC_LZ4_FORMAT) {
539 if (compversion != BLOSC_LZ4_VERSION_FORMAT) {
540 return -9;
541 }
542 context->decompress_func = &lz4_wrap_decompress;
543 return 0;
544 }
545 #endif /* HAVE_LZ4 */
546 #if defined(HAVE_SNAPPY)
547 if (compformat == BLOSC_SNAPPY_FORMAT) {
548 if (compversion != BLOSC_SNAPPY_VERSION_FORMAT) {
549 return -9;
550 }
551 context->decompress_func = &snappy_wrap_decompress;
552 return 0;
553 }
554 #endif /* HAVE_SNAPPY */
555 #if defined(HAVE_ZLIB)
556 if (compformat == BLOSC_ZLIB_FORMAT) {
557 if (compversion != BLOSC_ZLIB_VERSION_FORMAT) {
558 return -9;
559 }
560 context->decompress_func = &zlib_wrap_decompress;
561 return 0;
562 }
563 #endif /* HAVE_ZLIB */
564 #if defined(HAVE_ZSTD)
565 if (compformat == BLOSC_ZSTD_FORMAT) {
566 if (compversion != BLOSC_ZSTD_VERSION_FORMAT) {
567 return -9;
568 }
569 context->decompress_func = &zstd_wrap_decompress;
570 return 0;
571 }
572 #endif /* HAVE_ZSTD */
573 return -5; /* signals no decompression support */
574 }
575
576 /* Compute acceleration for blosclz */
get_accel(const struct blosc_context * context)577 static int get_accel(const struct blosc_context* context) {
578 int32_t clevel = context->clevel;
579
580 if (context->compcode == BLOSC_LZ4) {
581 /* This acceleration setting based on discussions held in:
582 * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
583 */
584 return (10 - clevel);
585 }
586 return 1;
587 }
588
589
590 /* Shuffle & compress a single block */
blosc_c(const struct blosc_context * context,int32_t blocksize,int32_t leftoverblock,int32_t ntbytes,int32_t maxbytes,const uint8_t * src,uint8_t * dest,uint8_t * tmp,uint8_t * tmp2)591 static int blosc_c(const struct blosc_context* context, int32_t blocksize,
592 int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes,
593 const uint8_t *src, uint8_t *dest, uint8_t *tmp,
594 uint8_t *tmp2)
595 {
596 int8_t header_flags = *(context->header_flags);
597 int dont_split = (header_flags & 0x10) >> 4;
598 int32_t j, neblock, nsplits;
599 int32_t cbytes; /* number of compressed bytes in split */
600 int32_t ctbytes = 0; /* number of compressed bytes in block */
601 int32_t maxout;
602 int32_t typesize = context->typesize;
603 const uint8_t *_tmp = src;
604 const char *compname;
605 int accel;
606 int bscount;
607 int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
608 int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
609 (blocksize >= typesize));
610
611 if (doshuffle) {
612 /* Byte shuffling only makes sense if typesize > 1 */
613 blosc_internal_shuffle(typesize, blocksize, src, tmp);
614 _tmp = tmp;
615 }
616 /* We don't allow more than 1 filter at the same time (yet) */
617 else if (dobitshuffle) {
618 bscount = blosc_internal_bitshuffle(typesize, blocksize, src, tmp, tmp2);
619 if (bscount < 0)
620 return bscount;
621 _tmp = tmp;
622 }
623
624 /* Calculate acceleration for different compressors */
625 accel = get_accel(context);
626
627 /* The number of splits for this block */
628 if (!dont_split && !leftoverblock) {
629 nsplits = typesize;
630 }
631 else {
632 nsplits = 1;
633 }
634 neblock = blocksize / nsplits;
635 for (j = 0; j < nsplits; j++) {
636 dest += sizeof(int32_t);
637 ntbytes += (int32_t)sizeof(int32_t);
638 ctbytes += (int32_t)sizeof(int32_t);
639 maxout = neblock;
640 #if defined(HAVE_SNAPPY)
641 if (context->compcode == BLOSC_SNAPPY) {
642 /* TODO perhaps refactor this to keep the value stashed somewhere */
643 maxout = snappy_max_compressed_length(neblock);
644 }
645 #endif /* HAVE_SNAPPY */
646 if (ntbytes+maxout > maxbytes) {
647 maxout = maxbytes - ntbytes; /* avoid buffer overrun */
648 if (maxout <= 0) {
649 return 0; /* non-compressible block */
650 }
651 }
652 if (context->compcode == BLOSC_BLOSCLZ) {
653 cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock,
654 dest, maxout);
655 }
656 #if defined(HAVE_LZ4)
657 else if (context->compcode == BLOSC_LZ4) {
658 cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
659 (char *)dest, (size_t)maxout, accel);
660 }
661 else if (context->compcode == BLOSC_LZ4HC) {
662 cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
663 (char *)dest, (size_t)maxout,
664 context->clevel);
665 }
666 #endif /* HAVE_LZ4 */
667 #if defined(HAVE_SNAPPY)
668 else if (context->compcode == BLOSC_SNAPPY) {
669 cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
670 (char *)dest, (size_t)maxout);
671 }
672 #endif /* HAVE_SNAPPY */
673 #if defined(HAVE_ZLIB)
674 else if (context->compcode == BLOSC_ZLIB) {
675 cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
676 (char *)dest, (size_t)maxout,
677 context->clevel);
678 }
679 #endif /* HAVE_ZLIB */
680 #if defined(HAVE_ZSTD)
681 else if (context->compcode == BLOSC_ZSTD) {
682 cbytes = zstd_wrap_compress((char*)_tmp + j * neblock, (size_t)neblock,
683 (char*)dest, (size_t)maxout, context->clevel);
684 }
685 #endif /* HAVE_ZSTD */
686
687 else {
688 blosc_compcode_to_compname(context->compcode, &compname);
689 if (compname == NULL) {
690 compname = "(null)";
691 }
692 fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
693 fprintf(stderr, "compression support. Please use one having it.");
694 return -5; /* signals no compression support */
695 }
696
697 if (cbytes > maxout) {
698 /* Buffer overrun caused by compression (should never happen) */
699 return -1;
700 }
701 else if (cbytes < 0) {
702 /* cbytes should never be negative */
703 return -2;
704 }
705 else if (cbytes == 0 || cbytes == neblock) {
706 /* The compressor has been unable to compress data at all. */
707 /* Before doing the copy, check that we are not running into a
708 buffer overflow. */
709 if ((ntbytes+neblock) > maxbytes) {
710 return 0; /* Non-compressible data */
711 }
712 fastcopy(dest, _tmp + j * neblock, neblock);
713 cbytes = neblock;
714 }
715 _sw32(dest - 4, cbytes);
716 dest += cbytes;
717 ntbytes += cbytes;
718 ctbytes += cbytes;
719 } /* Closes j < nsplits */
720
721 return ctbytes;
722 }
723
724 /* Decompress & unshuffle a single block */
blosc_d(struct blosc_context * context,int32_t blocksize,int32_t leftoverblock,const uint8_t * base_src,int32_t src_offset,uint8_t * dest,uint8_t * tmp,uint8_t * tmp2)725 static int blosc_d(struct blosc_context* context, int32_t blocksize,
726 int32_t leftoverblock, const uint8_t* base_src,
727 int32_t src_offset, uint8_t* dest, uint8_t* tmp,
728 uint8_t* tmp2) {
729 int8_t header_flags = *(context->header_flags);
730 int dont_split = (header_flags & 0x10) >> 4;
731 int32_t j, neblock, nsplits;
732 int32_t nbytes; /* number of decompressed bytes in split */
733 const int32_t compressedsize = context->compressedsize;
734 int32_t cbytes; /* number of compressed bytes in split */
735 int32_t ctbytes = 0; /* number of compressed bytes in block */
736 int32_t ntbytes = 0; /* number of uncompressed bytes in block */
737 uint8_t *_tmp = dest;
738 int32_t typesize = context->typesize;
739 int bscount;
740 int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
741 int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
742 (blocksize >= typesize));
743 const uint8_t* src;
744
745 if (doshuffle || dobitshuffle) {
746 _tmp = tmp;
747 }
748
749 /* The number of splits for this block */
750 if (!dont_split &&
751 /* For compatibility with before the introduction of the split flag */
752 ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE) &&
753 !leftoverblock) {
754 nsplits = typesize;
755 }
756 else {
757 nsplits = 1;
758 }
759
760 neblock = blocksize / nsplits;
761 for (j = 0; j < nsplits; j++) {
762 /* Validate src_offset */
763 if (src_offset < 0 || src_offset > compressedsize - sizeof(int32_t)) {
764 return -1;
765 }
766 cbytes = sw32_(base_src + src_offset); /* amount of compressed bytes */
767 src_offset += sizeof(int32_t);
768 /* Validate cbytes */
769 if (cbytes < 0 || cbytes > context->compressedsize - src_offset) {
770 return -1;
771 }
772 ctbytes += (int32_t)sizeof(int32_t);
773 src = base_src + src_offset;
774 /* Uncompress */
775 if (cbytes == neblock) {
776 fastcopy(_tmp, src, neblock);
777 nbytes = neblock;
778 }
779 else {
780 nbytes = context->decompress_func(src, cbytes, _tmp, neblock);
781 /* Check that decompressed bytes number is correct */
782 if (nbytes != neblock) {
783 return -2;
784 }
785 }
786 src_offset += cbytes;
787 ctbytes += cbytes;
788 _tmp += nbytes;
789 ntbytes += nbytes;
790 } /* Closes j < nsplits */
791
792 if (doshuffle) {
793 blosc_internal_unshuffle(typesize, blocksize, tmp, dest);
794 }
795 else if (dobitshuffle) {
796 bscount = blosc_internal_bitunshuffle(typesize, blocksize, tmp, dest, tmp2);
797 if (bscount < 0)
798 return bscount;
799 }
800
801 /* Return the number of uncompressed bytes */
802 return ntbytes;
803 }
804
805 /* Serial version for compression/decompression */
serial_blosc(struct blosc_context * context)806 static int serial_blosc(struct blosc_context* context)
807 {
808 int32_t j, bsize, leftoverblock;
809 int32_t cbytes;
810
811 int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
812 int32_t ntbytes = context->num_output_bytes;
813
814 uint8_t *tmp = my_malloc(context->blocksize + ebsize);
815 uint8_t *tmp2 = tmp + context->blocksize;
816
817 for (j = 0; j < context->nblocks; j++) {
818 if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) {
819 _sw32(context->bstarts + j * 4, ntbytes);
820 }
821 bsize = context->blocksize;
822 leftoverblock = 0;
823 if ((j == context->nblocks - 1) && (context->leftover > 0)) {
824 bsize = context->leftover;
825 leftoverblock = 1;
826 }
827 if (context->compress) {
828 if (*(context->header_flags) & BLOSC_MEMCPYED) {
829 /* We want to memcpy only */
830 fastcopy(context->dest + BLOSC_MAX_OVERHEAD + j * context->blocksize,
831 context->src + j * context->blocksize, bsize);
832 cbytes = bsize;
833 }
834 else {
835 /* Regular compression */
836 cbytes = blosc_c(context, bsize, leftoverblock, ntbytes,
837 context->destsize, context->src+j*context->blocksize,
838 context->dest+ntbytes, tmp, tmp2);
839 if (cbytes == 0) {
840 ntbytes = 0; /* uncompressible data */
841 break;
842 }
843 }
844 }
845 else {
846 if (*(context->header_flags) & BLOSC_MEMCPYED) {
847 /* We want to memcpy only */
848 fastcopy(context->dest + j * context->blocksize,
849 context->src + BLOSC_MAX_OVERHEAD + j * context->blocksize, bsize);
850 cbytes = bsize;
851 }
852 else {
853 /* Regular decompression */
854 cbytes = blosc_d(context, bsize, leftoverblock, context->src,
855 sw32_(context->bstarts + j * 4),
856 context->dest + j * context->blocksize, tmp, tmp2);
857 }
858 }
859 if (cbytes < 0) {
860 ntbytes = cbytes; /* error in blosc_c or blosc_d */
861 break;
862 }
863 ntbytes += cbytes;
864 }
865
866 /* Free temporaries */
867 my_free(tmp);
868
869 return ntbytes;
870 }
871
872
873 /* Threaded version for compression/decompression */
parallel_blosc(struct blosc_context * context)874 static int parallel_blosc(struct blosc_context* context)
875 {
876 int rc;
877 (void)rc; // just to avoid 'unused-variable' warning
878
879 /* Check whether we need to restart threads */
880 if (blosc_set_nthreads_(context) < 0) {
881 return -1;
882 }
883
884 /* Set sentinels */
885 context->thread_giveup_code = 1;
886 context->thread_nblock = -1;
887
888 /* Synchronization point for all threads (wait for initialization) */
889 WAIT_INIT(-1, context);
890
891 /* Synchronization point for all threads (wait for finalization) */
892 WAIT_FINISH(-1, context);
893
894 if (context->thread_giveup_code > 0) {
895 /* Return the total bytes (de-)compressed in threads */
896 return context->num_output_bytes;
897 }
898 else {
899 /* Compression/decompression gave up. Return error code. */
900 return context->thread_giveup_code;
901 }
902 }
903
904
905 /* Do the compression or decompression of the buffer depending on the
906 global params. */
do_job(struct blosc_context * context)907 static int do_job(struct blosc_context* context)
908 {
909 int32_t ntbytes;
910
911 /* Run the serial version when nthreads is 1 or when the buffers are
912 not much larger than blocksize */
913 if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
914 ntbytes = serial_blosc(context);
915 }
916 else {
917 ntbytes = parallel_blosc(context);
918 }
919
920 return ntbytes;
921 }
922
923
924 /* Whether a codec is meant for High Compression Ratios */
925 #define HCR(codec) ( \
926 ((codec) == BLOSC_LZ4HC) || \
927 ((codec) == BLOSC_ZLIB) || \
928 ((codec) == BLOSC_ZSTD) ? 1 : 0 )
929
930
931 /* Conditions for splitting a block before compressing with a codec. */
split_block(int compcode,int typesize,int blocksize)932 static int split_block(int compcode, int typesize, int blocksize) {
933 int splitblock = -1;
934
935 switch (g_splitmode) {
936 case BLOSC_ALWAYS_SPLIT:
937 splitblock = 1;
938 break;
939 case BLOSC_NEVER_SPLIT:
940 splitblock = 0;
941 break;
942 case BLOSC_AUTO_SPLIT:
943 /* Normally all the compressors designed for speed benefit from a
944 split. However, in conducted benchmarks LZ4 seems that it runs
945 faster if we don't split, which is quite surprising. */
946 splitblock= (((compcode == BLOSC_BLOSCLZ) ||
947 (compcode == BLOSC_SNAPPY)) &&
948 (typesize <= MAX_SPLITS) &&
949 (blocksize / typesize) >= MIN_BUFFERSIZE);
950 break;
951 case BLOSC_FORWARD_COMPAT_SPLIT:
952 /* The zstd support was introduced at the same time than the split flag, so
953 * there should be not a problem with not splitting bloscks with it */
954 splitblock = ((compcode != BLOSC_ZSTD) &&
955 (typesize <= MAX_SPLITS) &&
956 (blocksize / typesize) >= MIN_BUFFERSIZE);
957 break;
958 default:
959 fprintf(stderr, "Split mode %d not supported", g_splitmode);
960 }
961 return splitblock;
962 }
963
964
compute_blocksize(struct blosc_context * context,int32_t clevel,int32_t typesize,int32_t nbytes,int32_t forced_blocksize)965 static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel,
966 int32_t typesize, int32_t nbytes,
967 int32_t forced_blocksize)
968 {
969 int32_t blocksize;
970
971 /* Protection against very small buffers */
972 if (nbytes < (int32_t)typesize) {
973 return 1;
974 }
975
976 blocksize = nbytes; /* Start by a whole buffer as blocksize */
977
978 if (forced_blocksize) {
979 blocksize = forced_blocksize;
980 /* Check that forced blocksize is not too small */
981 if (blocksize < MIN_BUFFERSIZE) {
982 blocksize = MIN_BUFFERSIZE;
983 }
984 /* Check that forced blocksize is not too large */
985 if (blocksize > BLOSC_MAX_BLOCKSIZE) {
986 blocksize = BLOSC_MAX_BLOCKSIZE;
987 }
988 }
989 else if (nbytes >= L1) {
990 blocksize = L1;
991
992 /* For HCR codecs, increase the block sizes by a factor of 2 because they
993 are meant for compressing large blocks (i.e. they show a big overhead
994 when compressing small ones). */
995 if (HCR(context->compcode)) {
996 blocksize *= 2;
997 }
998
999 switch (clevel) {
1000 case 0:
1001 /* Case of plain copy */
1002 blocksize /= 4;
1003 break;
1004 case 1:
1005 blocksize /= 2;
1006 break;
1007 case 2:
1008 blocksize *= 1;
1009 break;
1010 case 3:
1011 blocksize *= 2;
1012 break;
1013 case 4:
1014 case 5:
1015 blocksize *= 4;
1016 break;
1017 case 6:
1018 case 7:
1019 case 8:
1020 blocksize *= 8;
1021 break;
1022 case 9:
1023 blocksize *= 8;
1024 if (HCR(context->compcode)) {
1025 blocksize *= 2;
1026 }
1027 break;
1028 default:
1029 assert(0);
1030 break;
1031 }
1032 }
1033
1034 /* Enlarge the blocksize for splittable codecs */
1035 if (clevel > 0 && split_block(context->compcode, typesize, blocksize)) {
1036 if (blocksize > (1 << 18)) {
1037 /* Do not use a too large split buffer (> 256 KB) for splitting codecs */
1038 blocksize = (1 << 18);
1039 }
1040 blocksize *= typesize;
1041 if (blocksize < (1 << 16)) {
1042 /* Do not use a too small blocksize (< 64 KB) when typesize is small */
1043 blocksize = (1 << 16);
1044 }
1045 if (blocksize > 1024 * 1024) {
1046 /* But do not exceed 1 MB per thread (having this capacity in L3 is normal in modern CPUs) */
1047 blocksize = 1024 * 1024;
1048 }
1049
1050 }
1051
1052 /* Check that blocksize is not too large */
1053 if (blocksize > (int32_t)nbytes) {
1054 blocksize = nbytes;
1055 }
1056
1057 /* blocksize *must absolutely* be a multiple of the typesize */
1058 if (blocksize > typesize) {
1059 blocksize = blocksize / typesize * typesize;
1060 }
1061
1062 return blocksize;
1063 }
1064
initialize_context_compression(struct blosc_context * context,int clevel,int doshuffle,size_t typesize,size_t sourcesize,const void * src,void * dest,size_t destsize,int32_t compressor,int32_t blocksize,int32_t numthreads)1065 static int initialize_context_compression(struct blosc_context* context,
1066 int clevel,
1067 int doshuffle,
1068 size_t typesize,
1069 size_t sourcesize,
1070 const void* src,
1071 void* dest,
1072 size_t destsize,
1073 int32_t compressor,
1074 int32_t blocksize,
1075 int32_t numthreads)
1076 {
1077 char *envvar = NULL;
1078 int warnlvl = 0;
1079 /* Set parameters */
1080 context->compress = 1;
1081 context->src = (const uint8_t*)src;
1082 context->dest = (uint8_t *)(dest);
1083 context->num_output_bytes = 0;
1084 context->destsize = (int32_t)destsize;
1085 context->sourcesize = sourcesize;
1086 context->typesize = typesize;
1087 context->compcode = compressor;
1088 context->numthreads = numthreads;
1089 context->end_threads = 0;
1090 context->clevel = clevel;
1091
1092 envvar = getenv("BLOSC_WARN");
1093 if (envvar != NULL) {
1094 warnlvl = strtol(envvar, NULL, 10);
1095 }
1096
1097 /* Check buffer size limits */
1098 if (sourcesize > BLOSC_MAX_BUFFERSIZE) {
1099 if (warnlvl > 0) {
1100 fprintf(stderr, "Input buffer size cannot exceed %d bytes\n",
1101 BLOSC_MAX_BUFFERSIZE);
1102 }
1103 return 0;
1104 }
1105 if (destsize < BLOSC_MAX_OVERHEAD) {
1106 if (warnlvl > 0) {
1107 fprintf(stderr, "Output buffer size should be larger than %d bytes\n",
1108 BLOSC_MAX_OVERHEAD);
1109 }
1110 return 0;
1111 }
1112
1113 /* Compression level */
1114 if (clevel < 0 || clevel > 9) {
1115 fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
1116 return -10;
1117 }
1118
1119 /* Shuffle */
1120 if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) {
1121 fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n");
1122 return -10;
1123 }
1124
1125 /* Check typesize limits */
1126 if (context->typesize > BLOSC_MAX_TYPESIZE) {
1127 /* If typesize is too large, treat buffer as an 1-byte stream. */
1128 context->typesize = 1;
1129 }
1130
1131 /* Get the blocksize */
1132 context->blocksize = compute_blocksize(context, clevel, (int32_t)context->typesize, context->sourcesize, blocksize);
1133
1134 /* Compute number of blocks in buffer */
1135 context->nblocks = context->sourcesize / context->blocksize;
1136 context->leftover = context->sourcesize % context->blocksize;
1137 context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks;
1138
1139 return 1;
1140 }
1141
1142
write_compression_header(struct blosc_context * context,int clevel,int doshuffle)1143 static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle)
1144 {
1145 int32_t compformat;
1146 int dont_split;
1147
1148 /* Write version header for this block */
1149 context->dest[0] = BLOSC_VERSION_FORMAT; /* blosc format version */
1150
1151 /* Write compressor format */
1152 compformat = -1;
1153 switch (context->compcode)
1154 {
1155 case BLOSC_BLOSCLZ:
1156 compformat = BLOSC_BLOSCLZ_FORMAT;
1157 context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */
1158 break;
1159
1160 #if defined(HAVE_LZ4)
1161 case BLOSC_LZ4:
1162 compformat = BLOSC_LZ4_FORMAT;
1163 context->dest[1] = BLOSC_LZ4_VERSION_FORMAT; /* lz4 format version */
1164 break;
1165 case BLOSC_LZ4HC:
1166 compformat = BLOSC_LZ4HC_FORMAT;
1167 context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */
1168 break;
1169 #endif /* HAVE_LZ4 */
1170
1171 #if defined(HAVE_SNAPPY)
1172 case BLOSC_SNAPPY:
1173 compformat = BLOSC_SNAPPY_FORMAT;
1174 context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT; /* snappy format version */
1175 break;
1176 #endif /* HAVE_SNAPPY */
1177
1178 #if defined(HAVE_ZLIB)
1179 case BLOSC_ZLIB:
1180 compformat = BLOSC_ZLIB_FORMAT;
1181 context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT; /* zlib format version */
1182 break;
1183 #endif /* HAVE_ZLIB */
1184
1185 #if defined(HAVE_ZSTD)
1186 case BLOSC_ZSTD:
1187 compformat = BLOSC_ZSTD_FORMAT;
1188 context->dest[1] = BLOSC_ZSTD_VERSION_FORMAT; /* zstd format version */
1189 break;
1190 #endif /* HAVE_ZSTD */
1191
1192 default:
1193 {
1194 const char *compname;
1195 compname = clibcode_to_clibname(compformat);
1196 if (compname == NULL) {
1197 compname = "(null)";
1198 }
1199 fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
1200 fprintf(stderr, "compression support. Please use one having it.");
1201 return -5; /* signals no compression support */
1202 break;
1203 }
1204 }
1205
1206 context->header_flags = context->dest+2; /* flags */
1207 context->dest[2] = 0; /* zeroes flags */
1208 context->dest[3] = (uint8_t)context->typesize; /* type size */
1209 _sw32(context->dest + 4, context->sourcesize); /* size of the buffer */
1210 _sw32(context->dest + 8, context->blocksize); /* block size */
1211 context->bstarts = context->dest + 16; /* starts for every block */
1212 context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks; /* space for header and pointers */
1213
1214 if (context->clevel == 0) {
1215 /* Compression level 0 means buffer to be memcpy'ed */
1216 *(context->header_flags) |= BLOSC_MEMCPYED;
1217 context->num_output_bytes = 16; /* space just for header */
1218 }
1219
1220 if (context->sourcesize < MIN_BUFFERSIZE) {
1221 /* Buffer is too small. Try memcpy'ing. */
1222 *(context->header_flags) |= BLOSC_MEMCPYED;
1223 context->num_output_bytes = 16; /* space just for header */
1224 }
1225
1226 if (doshuffle == BLOSC_SHUFFLE) {
1227 /* Byte-shuffle is active */
1228 *(context->header_flags) |= BLOSC_DOSHUFFLE; /* bit 0 set to one in flags */
1229 }
1230
1231 if (doshuffle == BLOSC_BITSHUFFLE) {
1232 /* Bit-shuffle is active */
1233 *(context->header_flags) |= BLOSC_DOBITSHUFFLE; /* bit 2 set to one in flags */
1234 }
1235
1236 dont_split = !split_block(context->compcode, context->typesize,
1237 context->blocksize);
1238 *(context->header_flags) |= dont_split << 4; /* dont_split is in bit 4 */
1239 *(context->header_flags) |= compformat << 5; /* compressor format starts at bit 5 */
1240
1241 return 1;
1242 }
1243
1244
blosc_compress_context(struct blosc_context * context)1245 int blosc_compress_context(struct blosc_context* context)
1246 {
1247 int32_t ntbytes = 0;
1248
1249 if ((*(context->header_flags) & BLOSC_MEMCPYED) &&
1250 (context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize)) {
1251 return 0; /* data cannot be copied without overrun destination */
1252 }
1253
1254 /* Do the actual compression */
1255 ntbytes = do_job(context);
1256 if (ntbytes < 0) {
1257 return -1;
1258 }
1259 if ((ntbytes == 0) && (context->sourcesize + BLOSC_MAX_OVERHEAD <= context->destsize)) {
1260 /* Last chance for fitting `src` buffer in `dest`. Update flags and force a copy. */
1261 *(context->header_flags) |= BLOSC_MEMCPYED;
1262 context->num_output_bytes = BLOSC_MAX_OVERHEAD; /* reset the output bytes in previous step */
1263 ntbytes = do_job(context);
1264 if (ntbytes < 0) {
1265 return -1;
1266 }
1267 }
1268
1269 /* Set the number of compressed bytes in header */
1270 _sw32(context->dest + 12, ntbytes);
1271
1272 assert(ntbytes <= context->destsize);
1273 return ntbytes;
1274 }
1275
1276 /* The public routine for compression with context. */
blosc_compress_ctx(int clevel,int doshuffle,size_t typesize,size_t nbytes,const void * src,void * dest,size_t destsize,const char * compressor,size_t blocksize,int numinternalthreads)1277 int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
1278 size_t nbytes, const void* src, void* dest,
1279 size_t destsize, const char* compressor,
1280 size_t blocksize, int numinternalthreads)
1281 {
1282 int error, result;
1283 struct blosc_context context;
1284
1285 context.threads_started = 0;
1286 error = initialize_context_compression(&context, clevel, doshuffle, typesize,
1287 nbytes, src, dest, destsize,
1288 blosc_compname_to_compcode(compressor),
1289 blocksize, numinternalthreads);
1290 if (error <= 0) { return error; }
1291
1292 error = write_compression_header(&context, clevel, doshuffle);
1293 if (error <= 0) { return error; }
1294
1295 result = blosc_compress_context(&context);
1296
1297 if (numinternalthreads > 1)
1298 {
1299 blosc_release_threadpool(&context);
1300 }
1301
1302 return result;
1303 }
1304
1305 /* The public routine for compression. See blosc.h for docstrings. */
blosc_compress(int clevel,int doshuffle,size_t typesize,size_t nbytes,const void * src,void * dest,size_t destsize)1306 int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
1307 const void *src, void *dest, size_t destsize)
1308 {
1309 int result;
1310 char* envvar;
1311
1312 /* Check if should initialize */
1313 if (!g_initlib) blosc_init();
1314
1315 /* Check for environment variables */
1316 envvar = getenv("BLOSC_CLEVEL");
1317 if (envvar != NULL) {
1318 long value;
1319 value = strtol(envvar, NULL, 10);
1320 if ((value != EINVAL) && (value >= 0)) {
1321 clevel = (int)value;
1322 }
1323 }
1324
1325 envvar = getenv("BLOSC_SHUFFLE");
1326 if (envvar != NULL) {
1327 if (strcmp(envvar, "NOSHUFFLE") == 0) {
1328 doshuffle = BLOSC_NOSHUFFLE;
1329 }
1330 if (strcmp(envvar, "SHUFFLE") == 0) {
1331 doshuffle = BLOSC_SHUFFLE;
1332 }
1333 if (strcmp(envvar, "BITSHUFFLE") == 0) {
1334 doshuffle = BLOSC_BITSHUFFLE;
1335 }
1336 }
1337
1338 envvar = getenv("BLOSC_TYPESIZE");
1339 if (envvar != NULL) {
1340 long value;
1341 value = strtol(envvar, NULL, 10);
1342 if ((value != EINVAL) && (value > 0)) {
1343 typesize = (int)value;
1344 }
1345 }
1346
1347 envvar = getenv("BLOSC_COMPRESSOR");
1348 if (envvar != NULL) {
1349 result = blosc_set_compressor(envvar);
1350 if (result < 0) { return result; }
1351 }
1352
1353 envvar = getenv("BLOSC_BLOCKSIZE");
1354 if (envvar != NULL) {
1355 long blocksize;
1356 blocksize = strtol(envvar, NULL, 10);
1357 if ((blocksize != EINVAL) && (blocksize > 0)) {
1358 blosc_set_blocksize((size_t)blocksize);
1359 }
1360 }
1361
1362 envvar = getenv("BLOSC_NTHREADS");
1363 if (envvar != NULL) {
1364 long nthreads;
1365 nthreads = strtol(envvar, NULL, 10);
1366 if ((nthreads != EINVAL) && (nthreads > 0)) {
1367 result = blosc_set_nthreads((int)nthreads);
1368 if (result < 0) { return result; }
1369 }
1370 }
1371
1372 envvar = getenv("BLOSC_SPLITMODE");
1373 if (envvar != NULL) {
1374 if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
1375 blosc_set_splitmode(BLOSC_FORWARD_COMPAT_SPLIT);
1376 }
1377 else if (strcmp(envvar, "AUTO") == 0) {
1378 blosc_set_splitmode(BLOSC_AUTO_SPLIT);
1379 }
1380 else if (strcmp(envvar, "ALWAYS") == 0) {
1381 blosc_set_splitmode(BLOSC_ALWAYS_SPLIT);
1382 }
1383 else if (strcmp(envvar, "NEVER") == 0) {
1384 blosc_set_splitmode(BLOSC_NEVER_SPLIT);
1385 }
1386 else {
1387 fprintf(stderr, "BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
1388 return -1;
1389 }
1390 }
1391
1392 /* Check for a BLOSC_NOLOCK environment variable. It is important
1393 that this should be the last env var so that it can take the
1394 previous ones into account */
1395 envvar = getenv("BLOSC_NOLOCK");
1396 if (envvar != NULL) {
1397 const char *compname;
1398 blosc_compcode_to_compname(g_compressor, &compname);
1399 result = blosc_compress_ctx(clevel, doshuffle, typesize,
1400 nbytes, src, dest, destsize,
1401 compname, g_force_blocksize, g_threads);
1402 return result;
1403 }
1404
1405 pthread_mutex_lock(global_comp_mutex);
1406
1407 do {
1408 result = initialize_context_compression(g_global_context, clevel, doshuffle,
1409 typesize, nbytes, src, dest, destsize,
1410 g_compressor, g_force_blocksize,
1411 g_threads);
1412 if (result <= 0) { break; }
1413
1414 result = write_compression_header(g_global_context, clevel, doshuffle);
1415 if (result <= 0) { break; }
1416
1417 result = blosc_compress_context(g_global_context);
1418 } while (0);
1419
1420 pthread_mutex_unlock(global_comp_mutex);
1421
1422 return result;
1423 }
1424
blosc_run_decompression_with_context(struct blosc_context * context,const void * src,void * dest,size_t destsize,int numinternalthreads)1425 static int blosc_run_decompression_with_context(struct blosc_context* context,
1426 const void* src,
1427 void* dest,
1428 size_t destsize,
1429 int numinternalthreads)
1430 {
1431 uint8_t version;
1432 int32_t ntbytes;
1433
1434 context->compress = 0;
1435 context->src = (const uint8_t*)src;
1436 context->dest = (uint8_t*)dest;
1437 context->destsize = destsize;
1438 context->num_output_bytes = 0;
1439 context->numthreads = numinternalthreads;
1440 context->end_threads = 0;
1441
1442 /* Read the header block */
1443 version = context->src[0]; /* blosc format version */
1444 context->compversion = context->src[1];
1445
1446 context->header_flags = (uint8_t*)(context->src + 2); /* flags */
1447 context->typesize = (int32_t)context->src[3]; /* typesize */
1448 context->sourcesize = sw32_(context->src + 4); /* buffer size */
1449 context->blocksize = sw32_(context->src + 8); /* block size */
1450 context->compressedsize = sw32_(context->src + 12); /* compressed buffer size */
1451 context->bstarts = (uint8_t*)(context->src + 16);
1452
1453 if (context->sourcesize == 0) {
1454 /* Source buffer was empty, so we are done */
1455 return 0;
1456 }
1457
1458 if (context->blocksize <= 0 || context->blocksize > destsize ||
1459 context->blocksize > BLOSC_MAX_BLOCKSIZE || context->typesize <= 0 ||
1460 context->typesize > BLOSC_MAX_TYPESIZE) {
1461 return -1;
1462 }
1463
1464 if (version != BLOSC_VERSION_FORMAT) {
1465 /* Version from future */
1466 return -1;
1467 }
1468 if (*context->header_flags & 0x08) {
1469 /* compressor flags from the future */
1470 return -1;
1471 }
1472
1473 /* Compute some params */
1474 /* Total blocks */
1475 context->nblocks = context->sourcesize / context->blocksize;
1476 context->leftover = context->sourcesize % context->blocksize;
1477 context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks;
1478
1479 /* Check that we have enough space to decompress */
1480 if (context->sourcesize > (int32_t)destsize) {
1481 return -1;
1482 }
1483
1484 if (*(context->header_flags) & BLOSC_MEMCPYED) {
1485 /* Validate that compressed size is equal to decompressed size + header
1486 size. */
1487 if (context->sourcesize + BLOSC_MAX_OVERHEAD != context->compressedsize) {
1488 return -1;
1489 }
1490 } else {
1491 ntbytes = initialize_decompress_func(context);
1492 if (ntbytes != 0) return ntbytes;
1493
1494 /* Validate that compressed size is large enough to hold the bstarts array */
1495 if (context->nblocks > (context->compressedsize - 16) / 4) {
1496 return -1;
1497 }
1498 }
1499
1500 /* Do the actual decompression */
1501 ntbytes = do_job(context);
1502 if (ntbytes < 0) {
1503 return -1;
1504 }
1505
1506 assert(ntbytes <= (int32_t)destsize);
1507 return ntbytes;
1508 }
1509
blosc_decompress_ctx(const void * src,void * dest,size_t destsize,int numinternalthreads)1510 int blosc_decompress_ctx(const void* src, void* dest, size_t destsize,
1511 int numinternalthreads) {
1512 int result;
1513 struct blosc_context context;
1514
1515 context.threads_started = 0;
1516 result = blosc_run_decompression_with_context(&context, src, dest, destsize,
1517 numinternalthreads);
1518
1519 if (numinternalthreads > 1)
1520 {
1521 blosc_release_threadpool(&context);
1522 }
1523
1524 return result;
1525 }
1526
blosc_decompress(const void * src,void * dest,size_t destsize)1527 int blosc_decompress(const void* src, void* dest, size_t destsize) {
1528 int result;
1529 char* envvar;
1530 long nthreads;
1531
1532 /* Check if should initialize */
1533 if (!g_initlib) blosc_init();
1534
1535 /* Check for a BLOSC_NTHREADS environment variable */
1536 envvar = getenv("BLOSC_NTHREADS");
1537 if (envvar != NULL) {
1538 nthreads = strtol(envvar, NULL, 10);
1539 if ((nthreads != EINVAL) && (nthreads > 0)) {
1540 result = blosc_set_nthreads((int)nthreads);
1541 if (result < 0) { return result; }
1542 }
1543 }
1544
1545 /* Check for a BLOSC_NOLOCK environment variable. It is important
1546 that this should be the last env var so that it can take the
1547 previous ones into account */
1548 envvar = getenv("BLOSC_NOLOCK");
1549 if (envvar != NULL) {
1550 result = blosc_decompress_ctx(src, dest, destsize, g_threads);
1551 return result;
1552 }
1553
1554 pthread_mutex_lock(global_comp_mutex);
1555
1556 result = blosc_run_decompression_with_context(g_global_context, src, dest,
1557 destsize, g_threads);
1558
1559 pthread_mutex_unlock(global_comp_mutex);
1560
1561 return result;
1562 }
1563
blosc_getitem(const void * src,int start,int nitems,void * dest)1564 int blosc_getitem(const void* src, int start, int nitems, void* dest) {
1565 uint8_t *_src=NULL; /* current pos for source buffer */
1566 uint8_t version, compversion; /* versions for compressed header */
1567 uint8_t flags; /* flags for header */
1568 int32_t ntbytes = 0; /* the number of uncompressed bytes */
1569 int32_t nblocks; /* number of total blocks in buffer */
1570 int32_t leftover; /* extra bytes at end of buffer */
1571 uint8_t *bstarts; /* start pointers for each block */
1572 int32_t typesize, blocksize, nbytes, compressedsize;
1573 int32_t j, bsize, bsize2, leftoverblock;
1574 int32_t cbytes, startb, stopb;
1575 int stop = start + nitems;
1576 uint8_t *tmp;
1577 uint8_t *tmp2;
1578 uint8_t *tmp3;
1579 int32_t ebsize;
1580 struct blosc_context context = {0};
1581
1582 _src = (uint8_t *)(src);
1583
1584 /* Read the header block */
1585 version = _src[0]; /* blosc format version */
1586 compversion = _src[1];
1587 flags = _src[2]; /* flags */
1588 typesize = (int32_t)_src[3]; /* typesize */
1589 nbytes = sw32_(_src + 4); /* buffer size */
1590 blocksize = sw32_(_src + 8); /* block size */
1591 compressedsize = sw32_(_src + 12); /* compressed buffer size */
1592
1593 if (version != BLOSC_VERSION_FORMAT)
1594 return -9;
1595
1596 if (blocksize <= 0 || blocksize > nbytes || blocksize > BLOSC_MAX_BLOCKSIZE ||
1597 typesize <= 0 || typesize > BLOSC_MAX_TYPESIZE) {
1598 return -1;
1599 }
1600
1601 /* Compute some params */
1602 /* Total blocks */
1603 nblocks = nbytes / blocksize;
1604 leftover = nbytes % blocksize;
1605 nblocks = (leftover>0)? nblocks+1: nblocks;
1606
1607 /* Only initialize the fields blosc_d uses */
1608 context.typesize = typesize;
1609 context.header_flags = &flags;
1610 context.compversion = compversion;
1611 context.compressedsize = compressedsize;
1612 if (flags & BLOSC_MEMCPYED) {
1613 if (nbytes + BLOSC_MAX_OVERHEAD != compressedsize) {
1614 return -1;
1615 }
1616 } else {
1617 ntbytes = initialize_decompress_func(&context);
1618 if (ntbytes != 0) return ntbytes;
1619
1620 if (nblocks >= (compressedsize - 16) / 4) {
1621 return -1;
1622 }
1623 }
1624
1625 ebsize = blocksize + typesize * (int32_t)sizeof(int32_t);
1626 tmp = my_malloc(blocksize + ebsize + blocksize);
1627 tmp2 = tmp + blocksize;
1628 tmp3 = tmp + blocksize + ebsize;
1629
1630 _src += 16;
1631 bstarts = _src;
1632 _src += sizeof(int32_t)*nblocks;
1633
1634 /* Check region boundaries */
1635 if ((start < 0) || (start*typesize > nbytes)) {
1636 fprintf(stderr, "`start` out of bounds");
1637 return -1;
1638 }
1639
1640 if ((stop < 0) || (stop*typesize > nbytes)) {
1641 fprintf(stderr, "`start`+`nitems` out of bounds");
1642 return -1;
1643 }
1644
1645 for (j = 0; j < nblocks; j++) {
1646 bsize = blocksize;
1647 leftoverblock = 0;
1648 if ((j == nblocks - 1) && (leftover > 0)) {
1649 bsize = leftover;
1650 leftoverblock = 1;
1651 }
1652
1653 /* Compute start & stop for each block */
1654 startb = start * typesize - j * blocksize;
1655 stopb = stop * typesize - j * blocksize;
1656 if ((startb >= (int)blocksize) || (stopb <= 0)) {
1657 continue;
1658 }
1659 if (startb < 0) {
1660 startb = 0;
1661 }
1662 if (stopb > (int)blocksize) {
1663 stopb = blocksize;
1664 }
1665 bsize2 = stopb - startb;
1666
1667 /* Do the actual data copy */
1668 if (flags & BLOSC_MEMCPYED) {
1669 /* We want to memcpy only */
1670 fastcopy((uint8_t *) dest + ntbytes,
1671 (uint8_t *) src + BLOSC_MAX_OVERHEAD + j * blocksize + startb, bsize2);
1672 cbytes = bsize2;
1673 }
1674 else {
1675 /* Regular decompression. Put results in tmp2. */
1676 cbytes = blosc_d(&context, bsize, leftoverblock,
1677 (uint8_t *)src, sw32_(bstarts + j * 4),
1678 tmp2, tmp, tmp3);
1679 if (cbytes < 0) {
1680 ntbytes = cbytes;
1681 break;
1682 }
1683 /* Copy to destination */
1684 fastcopy((uint8_t *) dest + ntbytes, tmp2 + startb, bsize2);
1685 cbytes = bsize2;
1686 }
1687 ntbytes += cbytes;
1688 }
1689
1690 my_free(tmp);
1691
1692 return ntbytes;
1693 }
1694
1695 /* Decompress & unshuffle several blocks in a single thread */
t_blosc(void * ctxt)1696 static void *t_blosc(void *ctxt)
1697 {
1698 struct thread_context* context = (struct thread_context*)ctxt;
1699 int32_t cbytes, ntdest;
1700 int32_t tblocks; /* number of blocks per thread */
1701 int32_t leftover2;
1702 int32_t tblock; /* limit block on a thread */
1703 int32_t nblock_; /* private copy of nblock */
1704 int32_t bsize, leftoverblock;
1705 /* Parameters for threads */
1706 int32_t blocksize;
1707 int32_t ebsize;
1708 int32_t compress;
1709 int32_t maxbytes;
1710 int32_t ntbytes;
1711 int32_t flags;
1712 int32_t nblocks;
1713 int32_t leftover;
1714 uint8_t *bstarts;
1715 const uint8_t *src;
1716 uint8_t *dest;
1717 uint8_t *tmp;
1718 uint8_t *tmp2;
1719 uint8_t *tmp3;
1720 int rc;
1721 (void)rc; // just to avoid 'unused-variable' warning
1722
1723 while(1)
1724 {
1725 /* Synchronization point for all threads (wait for initialization) */
1726 WAIT_INIT(NULL, context->parent_context);
1727
1728 if(context->parent_context->end_threads)
1729 {
1730 break;
1731 }
1732
1733 /* Get parameters for this thread before entering the main loop */
1734 blocksize = context->parent_context->blocksize;
1735 ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t);
1736 compress = context->parent_context->compress;
1737 flags = *(context->parent_context->header_flags);
1738 maxbytes = context->parent_context->destsize;
1739 nblocks = context->parent_context->nblocks;
1740 leftover = context->parent_context->leftover;
1741 bstarts = context->parent_context->bstarts;
1742 src = context->parent_context->src;
1743 dest = context->parent_context->dest;
1744
1745 if (blocksize > context->tmpblocksize)
1746 {
1747 my_free(context->tmp);
1748 context->tmp = my_malloc(blocksize + ebsize + blocksize);
1749 context->tmp2 = context->tmp + blocksize;
1750 context->tmp3 = context->tmp + blocksize + ebsize;
1751 }
1752
1753 tmp = context->tmp;
1754 tmp2 = context->tmp2;
1755 tmp3 = context->tmp3;
1756
1757 ntbytes = 0; /* only useful for decompression */
1758
1759 if (compress && !(flags & BLOSC_MEMCPYED)) {
1760 /* Compression always has to follow the block order */
1761 pthread_mutex_lock(&context->parent_context->count_mutex);
1762 context->parent_context->thread_nblock++;
1763 nblock_ = context->parent_context->thread_nblock;
1764 pthread_mutex_unlock(&context->parent_context->count_mutex);
1765 tblock = nblocks;
1766 }
1767 else {
1768 /* Decompression can happen using any order. We choose
1769 sequential block order on each thread */
1770
1771 /* Blocks per thread */
1772 tblocks = nblocks / context->parent_context->numthreads;
1773 leftover2 = nblocks % context->parent_context->numthreads;
1774 tblocks = (leftover2>0)? tblocks+1: tblocks;
1775
1776 nblock_ = context->tid*tblocks;
1777 tblock = nblock_ + tblocks;
1778 if (tblock > nblocks) {
1779 tblock = nblocks;
1780 }
1781 }
1782
1783 /* Loop over blocks */
1784 leftoverblock = 0;
1785 while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) {
1786 bsize = blocksize;
1787 if (nblock_ == (nblocks - 1) && (leftover > 0)) {
1788 bsize = leftover;
1789 leftoverblock = 1;
1790 }
1791 if (compress) {
1792 if (flags & BLOSC_MEMCPYED) {
1793 /* We want to memcpy only */
1794 fastcopy(dest + BLOSC_MAX_OVERHEAD + nblock_ * blocksize,
1795 src + nblock_ * blocksize, bsize);
1796 cbytes = bsize;
1797 }
1798 else {
1799 /* Regular compression */
1800 cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize,
1801 src+nblock_*blocksize, tmp2, tmp, tmp3);
1802 }
1803 }
1804 else {
1805 if (flags & BLOSC_MEMCPYED) {
1806 /* We want to memcpy only */
1807 fastcopy(dest + nblock_ * blocksize,
1808 src + BLOSC_MAX_OVERHEAD + nblock_ * blocksize, bsize);
1809 cbytes = bsize;
1810 }
1811 else {
1812 cbytes = blosc_d(context->parent_context, bsize, leftoverblock,
1813 src, sw32_(bstarts + nblock_ * 4),
1814 dest+nblock_*blocksize,
1815 tmp, tmp2);
1816 }
1817 }
1818
1819 /* Check whether current thread has to giveup */
1820 if (context->parent_context->thread_giveup_code <= 0) {
1821 break;
1822 }
1823
1824 /* Check results for the compressed/decompressed block */
1825 if (cbytes < 0) { /* compr/decompr failure */
1826 /* Set giveup_code error */
1827 pthread_mutex_lock(&context->parent_context->count_mutex);
1828 context->parent_context->thread_giveup_code = cbytes;
1829 pthread_mutex_unlock(&context->parent_context->count_mutex);
1830 break;
1831 }
1832
1833 if (compress && !(flags & BLOSC_MEMCPYED)) {
1834 /* Start critical section */
1835 pthread_mutex_lock(&context->parent_context->count_mutex);
1836 ntdest = context->parent_context->num_output_bytes;
1837 _sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */
1838 if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) {
1839 context->parent_context->thread_giveup_code = 0; /* uncompressible buffer */
1840 pthread_mutex_unlock(&context->parent_context->count_mutex);
1841 break;
1842 }
1843 context->parent_context->thread_nblock++;
1844 nblock_ = context->parent_context->thread_nblock;
1845 context->parent_context->num_output_bytes += cbytes; /* update return bytes counter */
1846 pthread_mutex_unlock(&context->parent_context->count_mutex);
1847 /* End of critical section */
1848
1849 /* Copy the compressed buffer to destination */
1850 fastcopy(dest + ntdest, tmp2, cbytes);
1851 }
1852 else {
1853 nblock_++;
1854 /* Update counter for this thread */
1855 ntbytes += cbytes;
1856 }
1857
1858 } /* closes while (nblock_) */
1859
1860 /* Sum up all the bytes decompressed */
1861 if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) {
1862 /* Update global counter for all threads (decompression only) */
1863 pthread_mutex_lock(&context->parent_context->count_mutex);
1864 context->parent_context->num_output_bytes += ntbytes;
1865 pthread_mutex_unlock(&context->parent_context->count_mutex);
1866 }
1867
1868 /* Meeting point for all threads (wait for finalization) */
1869 WAIT_FINISH(NULL, context->parent_context);
1870 }
1871
1872 /* Cleanup our working space and context */
1873 my_free(context->tmp);
1874 my_free(context);
1875
1876 return(NULL);
1877 }
1878
1879
init_threads(struct blosc_context * context)1880 static int init_threads(struct blosc_context* context)
1881 {
1882 int32_t tid;
1883 int rc2;
1884 int32_t ebsize;
1885 struct thread_context* thread_context;
1886
1887 /* Initialize mutex and condition variable objects */
1888 pthread_mutex_init(&context->count_mutex, NULL);
1889
1890 /* Set context thread sentinels */
1891 context->thread_giveup_code = 1;
1892 context->thread_nblock = -1;
1893
1894 /* Barrier initialization */
1895 #ifdef _POSIX_BARRIERS_MINE
1896 pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1);
1897 pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1);
1898 #else
1899 pthread_mutex_init(&context->count_threads_mutex, NULL);
1900 pthread_cond_init(&context->count_threads_cv, NULL);
1901 context->count_threads = 0; /* Reset threads counter */
1902 #endif
1903
1904 #if !defined(_WIN32)
1905 /* Initialize and set thread detached attribute */
1906 pthread_attr_init(&context->ct_attr);
1907 pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
1908 #endif
1909
1910 /* Finally, create the threads in detached state */
1911 for (tid = 0; tid < context->numthreads; tid++) {
1912 context->tids[tid] = tid;
1913
1914 /* Create a thread context thread owns context (will destroy when finished) */
1915 thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
1916 thread_context->parent_context = context;
1917 thread_context->tid = tid;
1918
1919 ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
1920 thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize);
1921 thread_context->tmp2 = thread_context->tmp + context->blocksize;
1922 thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize;
1923 thread_context->tmpblocksize = context->blocksize;
1924
1925 #if !defined(_WIN32)
1926 rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context);
1927 #else
1928 rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context);
1929 #endif
1930 if (rc2) {
1931 fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc2);
1932 fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
1933 return(-1);
1934 }
1935 }
1936
1937
1938 return(0);
1939 }
1940
blosc_get_nthreads(void)1941 int blosc_get_nthreads(void)
1942 {
1943 int ret = g_threads;
1944
1945 return ret;
1946 }
1947
blosc_set_nthreads(int nthreads_new)1948 int blosc_set_nthreads(int nthreads_new)
1949 {
1950 int ret = g_threads;
1951
1952 /* Check if should initialize */
1953 if (!g_initlib) blosc_init();
1954
1955 if (nthreads_new != ret){
1956 /* Re-initialize Blosc */
1957 blosc_destroy();
1958 blosc_init();
1959 g_threads = nthreads_new;
1960 }
1961
1962 return ret;
1963 }
1964
blosc_set_nthreads_(struct blosc_context * context)1965 int blosc_set_nthreads_(struct blosc_context* context)
1966 {
1967 if (context->numthreads > BLOSC_MAX_THREADS) {
1968 fprintf(stderr,
1969 "Error. nthreads cannot be larger than BLOSC_MAX_THREADS (%d)",
1970 BLOSC_MAX_THREADS);
1971 return -1;
1972 }
1973 else if (context->numthreads <= 0) {
1974 fprintf(stderr, "Error. nthreads must be a positive integer");
1975 return -1;
1976 }
1977
1978 /* Launch a new pool of threads */
1979 if (context->numthreads > 1 && context->numthreads != context->threads_started) {
1980 blosc_release_threadpool(context);
1981 if (init_threads(context) < 0) {
1982 return -1;
1983 }
1984 }
1985
1986 /* We have now started the threads */
1987 context->threads_started = context->numthreads;
1988
1989 return context->numthreads;
1990 }
1991
blosc_get_compressor(void)1992 const char* blosc_get_compressor(void)
1993 {
1994 const char* compname;
1995 blosc_compcode_to_compname(g_compressor, &compname);
1996
1997 return compname;
1998 }
1999
blosc_set_compressor(const char * compname)2000 int blosc_set_compressor(const char *compname)
2001 {
2002 int code = blosc_compname_to_compcode(compname);
2003
2004 g_compressor = code;
2005
2006 /* Check if should initialize */
2007 if (!g_initlib) blosc_init();
2008
2009 return code;
2010 }
2011
blosc_list_compressors(void)2012 const char* blosc_list_compressors(void)
2013 {
2014 static int compressors_list_done = 0;
2015 static char ret[256];
2016
2017 if (compressors_list_done) return ret;
2018 ret[0] = '\0';
2019 strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
2020 #if defined(HAVE_LZ4)
2021 strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME);
2022 strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME);
2023 #endif /* HAVE_LZ4 */
2024 #if defined(HAVE_SNAPPY)
2025 strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME);
2026 #endif /* HAVE_SNAPPY */
2027 #if defined(HAVE_ZLIB)
2028 strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME);
2029 #endif /* HAVE_ZLIB */
2030 #if defined(HAVE_ZSTD)
2031 strcat(ret, ","); strcat(ret, BLOSC_ZSTD_COMPNAME);
2032 #endif /* HAVE_ZSTD */
2033 compressors_list_done = 1;
2034 return ret;
2035 }
2036
blosc_get_version_string(void)2037 const char* blosc_get_version_string(void)
2038 {
2039 return BLOSC_VERSION_STRING;
2040 }
2041
blosc_get_complib_info(const char * compname,char ** complib,char ** version)2042 int blosc_get_complib_info(const char *compname, char **complib, char **version)
2043 {
2044 int clibcode;
2045 const char *clibname;
2046 const char *clibversion = "unknown";
2047
2048 #if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION)) || defined(ZSTD_VERSION_MAJOR)
2049 char sbuffer[256];
2050 #endif
2051
2052 clibcode = compname_to_clibcode(compname);
2053 clibname = clibcode_to_clibname(clibcode);
2054
2055 /* complib version */
2056 if (clibcode == BLOSC_BLOSCLZ_LIB) {
2057 clibversion = BLOSCLZ_VERSION_STRING;
2058 }
2059 #if defined(HAVE_LZ4)
2060 else if (clibcode == BLOSC_LZ4_LIB) {
2061 #if defined(LZ4_VERSION_MAJOR)
2062 sprintf(sbuffer, "%d.%d.%d",
2063 LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
2064 clibversion = sbuffer;
2065 #endif /* LZ4_VERSION_MAJOR */
2066 }
2067 #endif /* HAVE_LZ4 */
2068 #if defined(HAVE_SNAPPY)
2069 else if (clibcode == BLOSC_SNAPPY_LIB) {
2070 #if defined(SNAPPY_VERSION)
2071 sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL);
2072 clibversion = sbuffer;
2073 #endif /* SNAPPY_VERSION */
2074 }
2075 #endif /* HAVE_SNAPPY */
2076 #if defined(HAVE_ZLIB)
2077 else if (clibcode == BLOSC_ZLIB_LIB) {
2078 clibversion = ZLIB_VERSION;
2079 }
2080 #endif /* HAVE_ZLIB */
2081 #if defined(HAVE_ZSTD)
2082 else if (clibcode == BLOSC_ZSTD_LIB) {
2083 sprintf(sbuffer, "%d.%d.%d",
2084 ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
2085 clibversion = sbuffer;
2086 }
2087 #endif /* HAVE_ZSTD */
2088 else {
2089 /* Unsupported library */
2090 if (complib != NULL) *complib = NULL;
2091 if (version != NULL) *version = NULL;
2092 return -1;
2093 }
2094
2095 if (complib != NULL) *complib = strdup(clibname);
2096 if (version != NULL) *version = strdup(clibversion);
2097
2098 return clibcode;
2099 }
2100
2101 /* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
blosc_cbuffer_sizes(const void * cbuffer,size_t * nbytes,size_t * cbytes,size_t * blocksize)2102 void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes,
2103 size_t *cbytes, size_t *blocksize)
2104 {
2105 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
2106 uint8_t version = _src[0]; /* version of header */
2107
2108 if (version != BLOSC_VERSION_FORMAT) {
2109 *nbytes = *blocksize = *cbytes = 0;
2110 return;
2111 }
2112
2113 /* Read the interesting values */
2114 *nbytes = (size_t)sw32_(_src + 4); /* uncompressed buffer size */
2115 *blocksize = (size_t)sw32_(_src + 8); /* block size */
2116 *cbytes = (size_t)sw32_(_src + 12); /* compressed buffer size */
2117 }
2118
blosc_cbuffer_validate(const void * cbuffer,size_t cbytes,size_t * nbytes)2119 int blosc_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
2120 size_t header_cbytes, header_blocksize;
2121 if (cbytes < BLOSC_MIN_HEADER_LENGTH) return -1;
2122 blosc_cbuffer_sizes(cbuffer, nbytes, &header_cbytes, &header_blocksize);
2123 if (header_cbytes != cbytes) return -1;
2124 if (*nbytes > BLOSC_MAX_BUFFERSIZE) return -1;
2125 return 0;
2126 }
2127
2128 /* Return `typesize` and `flags` from a compressed buffer. */
blosc_cbuffer_metainfo(const void * cbuffer,size_t * typesize,int * flags)2129 void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize,
2130 int *flags)
2131 {
2132 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
2133
2134 uint8_t version = _src[0]; /* version of header */
2135
2136 if (version != BLOSC_VERSION_FORMAT) {
2137 *flags = *typesize = 0;
2138 return;
2139 }
2140
2141 /* Read the interesting values */
2142 *flags = (int)_src[2] & 7; /* first three flags */
2143 *typesize = (size_t)_src[3]; /* typesize */
2144 }
2145
2146
2147 /* Return version information from a compressed buffer. */
blosc_cbuffer_versions(const void * cbuffer,int * version,int * versionlz)2148 void blosc_cbuffer_versions(const void *cbuffer, int *version,
2149 int *versionlz)
2150 {
2151 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
2152
2153 /* Read the version info */
2154 *version = (int)_src[0]; /* blosc format version */
2155 *versionlz = (int)_src[1]; /* Lempel-Ziv compressor format version */
2156 }
2157
2158
2159 /* Return the compressor library/format used in a compressed buffer. */
blosc_cbuffer_complib(const void * cbuffer)2160 const char *blosc_cbuffer_complib(const void *cbuffer)
2161 {
2162 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
2163 int clibcode;
2164 const char *complib;
2165
2166 /* Read the compressor format/library info */
2167 clibcode = (_src[2] & 0xe0) >> 5;
2168 complib = clibcode_to_clibname(clibcode);
2169 return complib;
2170 }
2171
2172 /* Get the internal blocksize to be used during compression. 0 means
2173 that an automatic blocksize is computed internally. */
blosc_get_blocksize(void)2174 int blosc_get_blocksize(void)
2175 {
2176 return (int)g_force_blocksize;
2177 }
2178
2179 /* Force the use of a specific blocksize. If 0, an automatic
2180 blocksize will be used (the default). */
blosc_set_blocksize(size_t size)2181 void blosc_set_blocksize(size_t size)
2182 {
2183 g_force_blocksize = (int32_t)size;
2184 }
2185
2186 /* Force the use of a specific split mode. */
blosc_set_splitmode(int mode)2187 void blosc_set_splitmode(int mode)
2188 {
2189 g_splitmode = mode;
2190 }
2191
2192 /* Child global context is invalid and pool threads no longer exist post-fork.
2193 * Discard the old, inconsistent global context and global context mutex and
2194 * mark as uninitialized. Subsequent calls through `blosc_*` interfaces will
2195 * trigger re-init of the global context.
2196 *
2197 * All pthread interfaces have undefined behavior in child handler in current
2198 * posix standards: http://pubs.opengroup.org/onlinepubs/9699919799/
2199 */
blosc_atfork_child(void)2200 void blosc_atfork_child(void) {
2201 if (!g_initlib) return;
2202
2203 g_initlib = 0;
2204
2205 my_free(global_comp_mutex);
2206 global_comp_mutex = NULL;
2207
2208 my_free(g_global_context);
2209 g_global_context = NULL;
2210
2211 }
2212
blosc_init(void)2213 void blosc_init(void)
2214 {
2215 /* Return if we are already initialized */
2216 if (g_initlib) return;
2217
2218 global_comp_mutex = (pthread_mutex_t*)my_malloc(sizeof(pthread_mutex_t));
2219 pthread_mutex_init(global_comp_mutex, NULL);
2220
2221 g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context));
2222 g_global_context->threads_started = 0;
2223
2224 #if !defined(_WIN32)
2225 /* atfork handlers are only be registered once, though multiple re-inits may
2226 * occur via blosc_destroy/blosc_init. */
2227 if (!g_atfork_registered) {
2228 g_atfork_registered = 1;
2229 pthread_atfork(NULL, NULL, &blosc_atfork_child);
2230 }
2231 #endif
2232
2233 g_initlib = 1;
2234 }
2235
blosc_destroy(void)2236 void blosc_destroy(void)
2237 {
2238 /* Return if Blosc is not initialized */
2239 if (!g_initlib) return;
2240
2241 g_initlib = 0;
2242
2243 blosc_release_threadpool(g_global_context);
2244 my_free(g_global_context);
2245 g_global_context = NULL;
2246
2247 pthread_mutex_destroy(global_comp_mutex);
2248 my_free(global_comp_mutex);
2249 global_comp_mutex = NULL;
2250 }
2251
blosc_release_threadpool(struct blosc_context * context)2252 int blosc_release_threadpool(struct blosc_context* context)
2253 {
2254 int32_t t;
2255 void* status;
2256 int rc;
2257 int rc2;
2258 (void)rc; // just to avoid 'unused-variable' warning
2259
2260 if (context->threads_started > 0)
2261 {
2262 /* Tell all existing threads to finish */
2263 context->end_threads = 1;
2264
2265 /* Sync threads */
2266 WAIT_INIT(-1, context);
2267
2268 /* Join exiting threads */
2269 for (t=0; t<context->threads_started; t++) {
2270 rc2 = pthread_join(context->threads[t], &status);
2271 if (rc2) {
2272 fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
2273 fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
2274 }
2275 }
2276
2277 /* Release mutex and condition variable objects */
2278 pthread_mutex_destroy(&context->count_mutex);
2279
2280 /* Barriers */
2281 #ifdef _POSIX_BARRIERS_MINE
2282 pthread_barrier_destroy(&context->barr_init);
2283 pthread_barrier_destroy(&context->barr_finish);
2284 #else
2285 pthread_mutex_destroy(&context->count_threads_mutex);
2286 pthread_cond_destroy(&context->count_threads_cv);
2287 #endif
2288
2289 /* Thread attributes */
2290 #if !defined(_WIN32)
2291 pthread_attr_destroy(&context->ct_attr);
2292 #endif
2293
2294 }
2295
2296 context->threads_started = 0;
2297
2298 return 0;
2299 }
2300
blosc_free_resources(void)2301 int blosc_free_resources(void)
2302 {
2303 /* Return if Blosc is not initialized */
2304 if (!g_initlib) return -1;
2305
2306 return blosc_release_threadpool(g_global_context);
2307 }
2308