1 /*********************************************************************
2 Blosc - Blocked Shuffling and Compression Library
3
4 Author: Francesc Alted <francesc@blosc.org>
5 Creation date: 2009-05-20
6
7 See LICENSES/BLOSC.txt for details about copyright and rights to use.
8 **********************************************************************/
9
10
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <errno.h>
14 #include <string.h>
15 #include <sys/types.h>
16 #include <assert.h>
17
18 #include "fastcopy.h"
19
20 #if defined(USING_CMAKE)
21 #include "config.h"
22 #endif /* USING_CMAKE */
23 #include "blosc.h"
24 #include "shuffle.h"
25 #include "blosclz.h"
26 #if defined(HAVE_LZ4)
27 #include "lz4.h"
28 #include "lz4hc.h"
29 #endif /* HAVE_LZ4 */
30 #if defined(HAVE_SNAPPY)
31 #include "snappy-c.h"
32 #endif /* HAVE_SNAPPY */
33 #if defined(HAVE_ZLIB)
34 #include "zlib.h"
35 #endif /* HAVE_ZLIB */
36 #if defined(HAVE_ZSTD)
37 #include "zstd.h"
38 #endif /* HAVE_ZSTD */
39
40 #if defined(_WIN32) && !defined(__MINGW32__)
41 #include <windows.h>
42 #include <malloc.h>
43
44 /* stdint.h only available in VS2010 (VC++ 16.0) and newer */
45 #if defined(_MSC_VER) && _MSC_VER < 1600
46 #include "win32/stdint-windows.h"
47 #else
48 #include <stdint.h>
49 #endif
50
51 #include <process.h>
52 #define getpid _getpid
53 #else
54 #include <stdint.h>
55 #include <unistd.h>
56 #include <inttypes.h>
57 #endif /* _WIN32 */
58
59 /* Include the win32/pthread.h library for all the Windows builds. See #224. */
60 #if defined(_WIN32)
61 #include "win32/pthread.h"
62 #include "win32/pthread.c"
63 #else
64 #include <pthread.h>
65 #endif
66
67
68 /* Some useful units */
69 #define KB 1024
70 #define MB (1024 * (KB))
71
72 /* Minimum buffer size to be compressed */
73 #define MIN_BUFFERSIZE 128 /* Cannot be smaller than 66 */
74
75 /* The maximum number of splits in a block for compression */
76 #define MAX_SPLITS 16 /* Cannot be larger than 128 */
77
78 /* The size of L1 cache. 32 KB is quite common nowadays. */
79 #define L1 (32 * (KB))
80
81 /* Have problems using posix barriers when symbol value is 200112L */
82 /* This requires more investigation, but will work for the moment */
83 #if defined(_POSIX_BARRIERS) && ( (_POSIX_BARRIERS - 20012L) >= 0 && _POSIX_BARRIERS != 200112L)
84 #define _POSIX_BARRIERS_MINE
85 #endif
86 /* Synchronization variables */
87
88
89 struct blosc_context {
90 int32_t compress; /* 1 if we are doing compression 0 if decompress */
91
92 const uint8_t* src;
93 uint8_t* dest; /* The current pos in the destination buffer */
94 uint8_t* header_flags; /* Flags for header */
95 int compversion; /* Compressor version byte, only used during decompression */
96 int32_t sourcesize; /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */
97 int32_t compressedsize; /* Number of bytes of compressed data (only used when decompressing) */
98 int32_t nblocks; /* Number of total blocks in buffer */
99 int32_t leftover; /* Extra bytes at end of buffer */
100 int32_t blocksize; /* Length of the block in bytes */
101 int32_t typesize; /* Type size */
102 int32_t num_output_bytes; /* Counter for the number of output bytes */
103 int32_t destsize; /* Maximum size for destination buffer */
104 uint8_t* bstarts; /* Start of the buffer past header info */
105 int32_t compcode; /* Compressor code to use */
106 int clevel; /* Compression level (1-9) */
107 /* Function to use for decompression. Only used when decompression */
108 int (*decompress_func)(const void* input, int compressed_length, void* output,
109 int maxout);
110
111 /* Threading */
112 int32_t numthreads;
113 int32_t threads_started;
114 int32_t end_threads;
115 pthread_t threads[BLOSC_MAX_THREADS];
116 int32_t tids[BLOSC_MAX_THREADS];
117 pthread_mutex_t count_mutex;
118 #ifdef _POSIX_BARRIERS_MINE
119 pthread_barrier_t barr_init;
120 pthread_barrier_t barr_finish;
121 #else
122 int32_t count_threads;
123 pthread_mutex_t count_threads_mutex;
124 pthread_cond_t count_threads_cv;
125 #endif
126 #if !defined(_WIN32)
127 pthread_attr_t ct_attr; /* creation time attrs for threads */
128 #endif
129 int32_t thread_giveup_code; /* error code when give up */
130 int32_t thread_nblock; /* block counter */
131 };
132
133 struct thread_context {
134 struct blosc_context* parent_context;
135 int32_t tid;
136 uint8_t* tmp;
137 uint8_t* tmp2;
138 uint8_t* tmp3;
139 int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */
140 };
141
142 /* Global context for non-contextual API */
143 static struct blosc_context* g_global_context;
144 static pthread_mutex_t* global_comp_mutex;
145 static int32_t g_compressor = BLOSC_BLOSCLZ; /* the compressor to use by default */
146 static int32_t g_threads = 1;
147 static int32_t g_force_blocksize = 0;
148 static int32_t g_initlib = 0;
149 static int32_t g_atfork_registered = 0;
150 static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
151
152
153
154 /* Wrapped function to adjust the number of threads used by blosc */
155 int blosc_set_nthreads_(struct blosc_context*);
156
157 /* Releases the global threadpool */
158 int blosc_release_threadpool(struct blosc_context* context);
159
160 /* Macros for synchronization */
161
162 /* Wait until all threads are initialized */
163 #ifdef _POSIX_BARRIERS_MINE
164 #define WAIT_INIT(RET_VAL, CONTEXT_PTR) \
165 rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \
166 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
167 printf("Could not wait on barrier (init): %d\n", rc); \
168 return((RET_VAL)); \
169 }
170 #else
171 #define WAIT_INIT(RET_VAL, CONTEXT_PTR) \
172 pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
173 if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \
174 CONTEXT_PTR->count_threads++; \
175 pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
176 } \
177 else { \
178 pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
179 } \
180 pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
181 #endif
182
183 /* Wait for all threads to finish */
184 #ifdef _POSIX_BARRIERS_MINE
185 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \
186 rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \
187 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
188 printf("Could not wait on barrier (finish)\n"); \
189 return((RET_VAL)); \
190 }
191 #else
192 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \
193 pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
194 if (CONTEXT_PTR->count_threads > 0) { \
195 CONTEXT_PTR->count_threads--; \
196 pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
197 } \
198 else { \
199 pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
200 } \
201 pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
202 #endif
203
204
205 /* A function for aligned malloc that is portable */
my_malloc(size_t size)206 static uint8_t *my_malloc(size_t size)
207 {
208 void *block = NULL;
209 int res = 0;
210
211 /* Do an alignment to 32 bytes because AVX2 is supported */
212 #if defined(_WIN32)
213 /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
214 block = (void *)_aligned_malloc(size, 32);
215 #elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
216 /* Platform does have an implementation of posix_memalign */
217 res = posix_memalign(&block, 32, size);
218 #else
219 block = malloc(size);
220 #endif /* _WIN32 */
221
222 if (block == NULL || res != 0) {
223 printf("Error allocating memory!");
224 return NULL;
225 }
226
227 return (uint8_t *)block;
228 }
229
230
231 /* Release memory booked by my_malloc */
my_free(void * block)232 static void my_free(void *block)
233 {
234 #if defined(_WIN32)
235 _aligned_free(block);
236 #else
237 free(block);
238 #endif /* _WIN32 */
239 }
240
241
242 /* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */
sw32_(const uint8_t * pa)243 static int32_t sw32_(const uint8_t *pa)
244 {
245 int32_t idest;
246 uint8_t *dest = (uint8_t *)&idest;
247 int i = 1; /* for big/little endian detection */
248 char *p = (char *)&i;
249
250 if (p[0] != 1) {
251 /* big endian */
252 dest[0] = pa[3];
253 dest[1] = pa[2];
254 dest[2] = pa[1];
255 dest[3] = pa[0];
256 }
257 else {
258 /* little endian */
259 dest[0] = pa[0];
260 dest[1] = pa[1];
261 dest[2] = pa[2];
262 dest[3] = pa[3];
263 }
264 return idest;
265 }
266
267
268 /* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */
_sw32(uint8_t * dest,int32_t a)269 static void _sw32(uint8_t* dest, int32_t a)
270 {
271 uint8_t *pa = (uint8_t *)&a;
272 int i = 1; /* for big/little endian detection */
273 char *p = (char *)&i;
274
275 if (p[0] != 1) {
276 /* big endian */
277 dest[0] = pa[3];
278 dest[1] = pa[2];
279 dest[2] = pa[1];
280 dest[3] = pa[0];
281 }
282 else {
283 /* little endian */
284 dest[0] = pa[0];
285 dest[1] = pa[1];
286 dest[2] = pa[2];
287 dest[3] = pa[3];
288 }
289 }
290
291 /*
292 * Conversion routines between compressor and compression libraries
293 */
294
295 /* Return the library code associated with the compressor name */
compname_to_clibcode(const char * compname)296 static int compname_to_clibcode(const char *compname)
297 {
298 if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
299 return BLOSC_BLOSCLZ_LIB;
300 if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
301 return BLOSC_LZ4_LIB;
302 if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
303 return BLOSC_LZ4_LIB;
304 if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0)
305 return BLOSC_SNAPPY_LIB;
306 if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
307 return BLOSC_ZLIB_LIB;
308 if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
309 return BLOSC_ZSTD_LIB;
310 return -1;
311 }
312
313 /* Return the library name associated with the compressor code */
clibcode_to_clibname(int clibcode)314 static const char *clibcode_to_clibname(int clibcode)
315 {
316 if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
317 if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
318 if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME;
319 if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
320 if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
321 return NULL; /* should never happen */
322 }
323
324
325 /*
326 * Conversion routines between compressor names and compressor codes
327 */
328
329 /* Get the compressor name associated with the compressor code */
blosc_compcode_to_compname(int compcode,const char ** compname)330 int blosc_compcode_to_compname(int compcode, const char **compname)
331 {
332 int code = -1; /* -1 means non-existent compressor code */
333 const char *name = NULL;
334
335 /* Map the compressor code */
336 if (compcode == BLOSC_BLOSCLZ)
337 name = BLOSC_BLOSCLZ_COMPNAME;
338 else if (compcode == BLOSC_LZ4)
339 name = BLOSC_LZ4_COMPNAME;
340 else if (compcode == BLOSC_LZ4HC)
341 name = BLOSC_LZ4HC_COMPNAME;
342 else if (compcode == BLOSC_SNAPPY)
343 name = BLOSC_SNAPPY_COMPNAME;
344 else if (compcode == BLOSC_ZLIB)
345 name = BLOSC_ZLIB_COMPNAME;
346 else if (compcode == BLOSC_ZSTD)
347 name = BLOSC_ZSTD_COMPNAME;
348
349 *compname = name;
350
351 /* Guess if there is support for this code */
352 if (compcode == BLOSC_BLOSCLZ)
353 code = BLOSC_BLOSCLZ;
354 #if defined(HAVE_LZ4)
355 else if (compcode == BLOSC_LZ4)
356 code = BLOSC_LZ4;
357 else if (compcode == BLOSC_LZ4HC)
358 code = BLOSC_LZ4HC;
359 #endif /* HAVE_LZ4 */
360 #if defined(HAVE_SNAPPY)
361 else if (compcode == BLOSC_SNAPPY)
362 code = BLOSC_SNAPPY;
363 #endif /* HAVE_SNAPPY */
364 #if defined(HAVE_ZLIB)
365 else if (compcode == BLOSC_ZLIB)
366 code = BLOSC_ZLIB;
367 #endif /* HAVE_ZLIB */
368 #if defined(HAVE_ZSTD)
369 else if (compcode == BLOSC_ZSTD)
370 code = BLOSC_ZSTD;
371 #endif /* HAVE_ZSTD */
372
373 return code;
374 }
375
376 /* Get the compressor code for the compressor name. -1 if it is not available */
blosc_compname_to_compcode(const char * compname)377 int blosc_compname_to_compcode(const char *compname)
378 {
379 int code = -1; /* -1 means non-existent compressor code */
380
381 if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
382 code = BLOSC_BLOSCLZ;
383 }
384 #if defined(HAVE_LZ4)
385 else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
386 code = BLOSC_LZ4;
387 }
388 else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
389 code = BLOSC_LZ4HC;
390 }
391 #endif /* HAVE_LZ4 */
392 #if defined(HAVE_SNAPPY)
393 else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) {
394 code = BLOSC_SNAPPY;
395 }
396 #endif /* HAVE_SNAPPY */
397 #if defined(HAVE_ZLIB)
398 else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
399 code = BLOSC_ZLIB;
400 }
401 #endif /* HAVE_ZLIB */
402 #if defined(HAVE_ZSTD)
403 else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
404 code = BLOSC_ZSTD;
405 }
406 #endif /* HAVE_ZSTD */
407
408 return code;
409 }
410
411
412 #if defined(HAVE_LZ4)
lz4_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int accel)413 static int lz4_wrap_compress(const char* input, size_t input_length,
414 char* output, size_t maxout, int accel)
415 {
416 int cbytes;
417 cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout,
418 accel);
419 return cbytes;
420 }
421
lz4hc_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)422 static int lz4hc_wrap_compress(const char* input, size_t input_length,
423 char* output, size_t maxout, int clevel)
424 {
425 int cbytes;
426 if (input_length > (size_t)(2<<30))
427 return -1; /* input larger than 1 GB is not supported */
428 /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
429 * but levels larger than 9 does not buy much compression. */
430 cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
431 clevel);
432 return cbytes;
433 }
434
lz4_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)435 static int lz4_wrap_decompress(const void* input, int compressed_length,
436 void* output, int maxout)
437 {
438 return LZ4_decompress_safe(input, output, compressed_length, maxout);
439 }
440
lz4_wrap_decompress_unsafe(const void * input,int compressed_length,void * output,int maxout)441 static int lz4_wrap_decompress_unsafe(const void* input, int compressed_length,
442 void* output, int maxout)
443 {
444 size_t cbytes;
445 cbytes = LZ4_decompress_fast(input, output, (int)maxout);
446 if (cbytes != compressed_length) {
447 return 0;
448 }
449 return (int)maxout;
450 }
451
452 #endif /* HAVE_LZ4 */
453
454 #if defined(HAVE_SNAPPY)
snappy_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout)455 static int snappy_wrap_compress(const char* input, size_t input_length,
456 char* output, size_t maxout)
457 {
458 snappy_status status;
459 size_t cl = maxout;
460 status = snappy_compress(input, input_length, output, &cl);
461 if (status != SNAPPY_OK){
462 return 0;
463 }
464 return (int)cl;
465 }
466
snappy_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)467 static int snappy_wrap_decompress(const void* input, int compressed_length,
468 void* output, int maxout)
469 {
470 snappy_status status;
471 size_t ul = maxout;
472 status = snappy_uncompress(input, compressed_length, output, &ul);
473 if (status != SNAPPY_OK){
474 return 0;
475 }
476 return (int)ul;
477 }
478 #endif /* HAVE_SNAPPY */
479
480 #if defined(HAVE_ZLIB)
481 /* zlib is not very respectful with sharing name space with others.
482 Fortunately, its names do not collide with those already in blosc. */
zlib_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)483 static int zlib_wrap_compress(const char* input, size_t input_length,
484 char* output, size_t maxout, int clevel)
485 {
486 int status;
487 uLongf cl = maxout;
488 status = compress2(
489 (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
490 if (status != Z_OK){
491 return 0;
492 }
493 return (int)cl;
494 }
495
zlib_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)496 static int zlib_wrap_decompress(const void* input, int compressed_length,
497 void* output, int maxout) {
498 int status;
499 uLongf ul = maxout;
500 status = uncompress(
501 (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
502 if (status != Z_OK){
503 return 0;
504 }
505 return (int)ul;
506 }
507 #endif /* HAVE_ZLIB */
508
509 #if defined(HAVE_ZSTD)
zstd_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)510 static int zstd_wrap_compress(const char* input, size_t input_length,
511 char* output, size_t maxout, int clevel) {
512 size_t code;
513 clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
514 /* Make the level 8 close enough to maxCLevel */
515 if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
516 code = ZSTD_compress(
517 (void*)output, maxout, (void*)input, input_length, clevel);
518 if (ZSTD_isError(code)) {
519 return 0;
520 }
521 return (int)code;
522 }
523
zstd_wrap_decompress(const void * input,int compressed_length,void * output,int maxout)524 static int zstd_wrap_decompress(const void* input, int compressed_length,
525 void* output, int maxout) {
526 size_t code;
527 code = ZSTD_decompress(
528 (void*)output, maxout, (void*)input, compressed_length);
529 if (ZSTD_isError(code)) {
530 return 0;
531 }
532 return (int)code;
533 }
534 #endif /* HAVE_ZSTD */
535
initialize_decompress_func(struct blosc_context * context,int unsafe)536 static int initialize_decompress_func(struct blosc_context* context,
537 int unsafe) {
538 int8_t header_flags = *(context->header_flags);
539 int32_t compformat = (header_flags & 0xe0) >> 5;
540 int compversion = context->compversion;
541
542 if (compformat == BLOSC_BLOSCLZ_FORMAT) {
543 if (compversion != BLOSC_BLOSCLZ_VERSION_FORMAT) {
544 return -9;
545 }
546 context->decompress_func =
547 unsafe ? &blosclz_decompress_unsafe : &blosclz_decompress;
548 return 0;
549 }
550 #if defined(HAVE_LZ4)
551 if (compformat == BLOSC_LZ4_FORMAT) {
552 if (compversion != BLOSC_LZ4_VERSION_FORMAT) {
553 return -9;
554 }
555 context->decompress_func =
556 unsafe ? &lz4_wrap_decompress_unsafe : &lz4_wrap_decompress;
557 return 0;
558 }
559 #endif /* HAVE_LZ4 */
560 #if defined(HAVE_SNAPPY)
561 if (compformat == BLOSC_SNAPPY_FORMAT) {
562 if (compversion != BLOSC_SNAPPY_VERSION_FORMAT) {
563 return -9;
564 }
565 context->decompress_func = &snappy_wrap_decompress;
566 return 0;
567 }
568 #endif /* HAVE_SNAPPY */
569 #if defined(HAVE_ZLIB)
570 if (compformat == BLOSC_ZLIB_FORMAT) {
571 if (compversion != BLOSC_ZLIB_VERSION_FORMAT) {
572 return -9;
573 }
574 context->decompress_func = &zlib_wrap_decompress;
575 return 0;
576 }
577 #endif /* HAVE_ZLIB */
578 #if defined(HAVE_ZSTD)
579 if (compformat == BLOSC_ZSTD_FORMAT) {
580 if (compversion != BLOSC_ZSTD_VERSION_FORMAT) {
581 return -9;
582 }
583 context->decompress_func = &zstd_wrap_decompress;
584 return 0;
585 }
586 #endif /* HAVE_ZSTD */
587 return -5; /* signals no decompression support */
588 }
589
590 /* Compute acceleration for blosclz */
get_accel(const struct blosc_context * context)591 static int get_accel(const struct blosc_context* context) {
592 int32_t clevel = context->clevel;
593
594 if (context->compcode == BLOSC_LZ4) {
595 /* This acceleration setting based on discussions held in:
596 * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
597 */
598 return (10 - clevel);
599 }
600 return 1;
601 }
602
603
604 /* Shuffle & compress a single block */
blosc_c(const struct blosc_context * context,int32_t blocksize,int32_t leftoverblock,int32_t ntbytes,int32_t maxbytes,const uint8_t * src,uint8_t * dest,uint8_t * tmp,uint8_t * tmp2)605 static int blosc_c(const struct blosc_context* context, int32_t blocksize,
606 int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes,
607 const uint8_t *src, uint8_t *dest, uint8_t *tmp,
608 uint8_t *tmp2)
609 {
610 int8_t header_flags = *(context->header_flags);
611 int dont_split = (header_flags & 0x10) >> 4;
612 int32_t j, neblock, nsplits;
613 int32_t cbytes; /* number of compressed bytes in split */
614 int32_t ctbytes = 0; /* number of compressed bytes in block */
615 int32_t maxout;
616 int32_t typesize = context->typesize;
617 const uint8_t *_tmp = src;
618 const char *compname;
619 int accel;
620 int bscount;
621 int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
622 int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
623 (blocksize >= typesize));
624
625 if (doshuffle) {
626 /* Byte shuffling only makes sense if typesize > 1 */
627 blosc_internal_shuffle(typesize, blocksize, src, tmp);
628 _tmp = tmp;
629 }
630 /* We don't allow more than 1 filter at the same time (yet) */
631 else if (dobitshuffle) {
632 bscount = blosc_internal_bitshuffle(typesize, blocksize, src, tmp, tmp2);
633 if (bscount < 0)
634 return bscount;
635 _tmp = tmp;
636 }
637
638 /* Calculate acceleration for different compressors */
639 accel = get_accel(context);
640
641 /* The number of splits for this block */
642 if (!dont_split && !leftoverblock) {
643 nsplits = typesize;
644 }
645 else {
646 nsplits = 1;
647 }
648 neblock = blocksize / nsplits;
649 for (j = 0; j < nsplits; j++) {
650 dest += sizeof(int32_t);
651 ntbytes += (int32_t)sizeof(int32_t);
652 ctbytes += (int32_t)sizeof(int32_t);
653 maxout = neblock;
654 #if defined(HAVE_SNAPPY)
655 if (context->compcode == BLOSC_SNAPPY) {
656 /* TODO perhaps refactor this to keep the value stashed somewhere */
657 maxout = snappy_max_compressed_length(neblock);
658 }
659 #endif /* HAVE_SNAPPY */
660 if (ntbytes+maxout > maxbytes) {
661 maxout = maxbytes - ntbytes; /* avoid buffer overrun */
662 if (maxout <= 0) {
663 return 0; /* non-compressible block */
664 }
665 }
666 if (context->compcode == BLOSC_BLOSCLZ) {
667 cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock,
668 dest, maxout);
669 }
670 #if defined(HAVE_LZ4)
671 else if (context->compcode == BLOSC_LZ4) {
672 cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
673 (char *)dest, (size_t)maxout, accel);
674 }
675 else if (context->compcode == BLOSC_LZ4HC) {
676 cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
677 (char *)dest, (size_t)maxout,
678 context->clevel);
679 }
680 #endif /* HAVE_LZ4 */
681 #if defined(HAVE_SNAPPY)
682 else if (context->compcode == BLOSC_SNAPPY) {
683 cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
684 (char *)dest, (size_t)maxout);
685 }
686 #endif /* HAVE_SNAPPY */
687 #if defined(HAVE_ZLIB)
688 else if (context->compcode == BLOSC_ZLIB) {
689 cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
690 (char *)dest, (size_t)maxout,
691 context->clevel);
692 }
693 #endif /* HAVE_ZLIB */
694 #if defined(HAVE_ZSTD)
695 else if (context->compcode == BLOSC_ZSTD) {
696 cbytes = zstd_wrap_compress((char*)_tmp + j * neblock, (size_t)neblock,
697 (char*)dest, (size_t)maxout, context->clevel);
698 }
699 #endif /* HAVE_ZSTD */
700
701 else {
702 blosc_compcode_to_compname(context->compcode, &compname);
703 fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
704 fprintf(stderr, "compression support. Please use one having it.");
705 return -5; /* signals no compression support */
706 }
707
708 if (cbytes > maxout) {
709 /* Buffer overrun caused by compression (should never happen) */
710 return -1;
711 }
712 else if (cbytes < 0) {
713 /* cbytes should never be negative */
714 return -2;
715 }
716 else if (cbytes == 0 || cbytes == neblock) {
717 /* The compressor has been unable to compress data at all. */
718 /* Before doing the copy, check that we are not running into a
719 buffer overflow. */
720 if ((ntbytes+neblock) > maxbytes) {
721 return 0; /* Non-compressible data */
722 }
723 blosc_internal_fastcopy(dest, _tmp + j * neblock, neblock);
724 cbytes = neblock;
725 }
726 _sw32(dest - 4, cbytes);
727 dest += cbytes;
728 ntbytes += cbytes;
729 ctbytes += cbytes;
730 } /* Closes j < nsplits */
731
732 return ctbytes;
733 }
734
735 /* Decompress & unshuffle a single block */
blosc_d(struct blosc_context * context,int32_t blocksize,int32_t leftoverblock,const uint8_t * base_src,int32_t src_offset,uint8_t * dest,uint8_t * tmp,uint8_t * tmp2)736 static int blosc_d(struct blosc_context* context, int32_t blocksize,
737 int32_t leftoverblock, const uint8_t* base_src,
738 int32_t src_offset, uint8_t* dest, uint8_t* tmp,
739 uint8_t* tmp2) {
740 int8_t header_flags = *(context->header_flags);
741 int dont_split = (header_flags & 0x10) >> 4;
742 int32_t j, neblock, nsplits;
743 int32_t nbytes; /* number of decompressed bytes in split */
744 const int32_t compressedsize = context->compressedsize;
745 int32_t cbytes; /* number of compressed bytes in split */
746 int32_t ctbytes = 0; /* number of compressed bytes in block */
747 int32_t ntbytes = 0; /* number of uncompressed bytes in block */
748 uint8_t *_tmp = dest;
749 int32_t typesize = context->typesize;
750 int bscount;
751 int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
752 int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
753 (blocksize >= typesize));
754 const uint8_t* src;
755
756 if (doshuffle || dobitshuffle) {
757 _tmp = tmp;
758 }
759
760 /* The number of splits for this block */
761 if (!dont_split &&
762 /* For compatibility with before the introduction of the split flag */
763 ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE) &&
764 !leftoverblock) {
765 nsplits = typesize;
766 }
767 else {
768 nsplits = 1;
769 }
770
771 neblock = blocksize / nsplits;
772 for (j = 0; j < nsplits; j++) {
773 /* Validate src_offset */
774 if (src_offset < 0 || src_offset > compressedsize - sizeof(int32_t)) {
775 return -1;
776 }
777 cbytes = sw32_(base_src + src_offset); /* amount of compressed bytes */
778 src_offset += sizeof(int32_t);
779 /* Validate cbytes */
780 if (cbytes < 0 || cbytes > context->compressedsize - src_offset) {
781 return -1;
782 }
783 ctbytes += (int32_t)sizeof(int32_t);
784 src = base_src + src_offset;
785 /* Uncompress */
786 if (cbytes == neblock) {
787 blosc_internal_fastcopy(_tmp, src, neblock);
788 nbytes = neblock;
789 }
790 else {
791 nbytes = context->decompress_func(src, cbytes, _tmp, neblock);
792 /* Check that decompressed bytes number is correct */
793 if (nbytes != neblock) {
794 return -2;
795 }
796 }
797 src_offset += cbytes;
798 ctbytes += cbytes;
799 _tmp += nbytes;
800 ntbytes += nbytes;
801 } /* Closes j < nsplits */
802
803 if (doshuffle) {
804 blosc_internal_unshuffle(typesize, blocksize, tmp, dest);
805 }
806 else if (dobitshuffle) {
807 bscount = blosc_internal_bitunshuffle(typesize, blocksize, tmp, dest, tmp2);
808 if (bscount < 0)
809 return bscount;
810 }
811
812 /* Return the number of uncompressed bytes */
813 return ntbytes;
814 }
815
816 /* Serial version for compression/decompression */
serial_blosc(struct blosc_context * context)817 static int serial_blosc(struct blosc_context* context)
818 {
819 int32_t j, bsize, leftoverblock;
820 int32_t cbytes;
821
822 int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
823 int32_t ntbytes = context->num_output_bytes;
824
825 uint8_t *tmp = my_malloc(context->blocksize + ebsize);
826 uint8_t *tmp2 = tmp + context->blocksize;
827
828 for (j = 0; j < context->nblocks; j++) {
829 if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) {
830 _sw32(context->bstarts + j * 4, ntbytes);
831 }
832 bsize = context->blocksize;
833 leftoverblock = 0;
834 if ((j == context->nblocks - 1) && (context->leftover > 0)) {
835 bsize = context->leftover;
836 leftoverblock = 1;
837 }
838 if (context->compress) {
839 if (*(context->header_flags) & BLOSC_MEMCPYED) {
840 /* We want to memcpy only */
841 blosc_internal_fastcopy(context->dest + BLOSC_MAX_OVERHEAD + j * context->blocksize,
842 context->src + j * context->blocksize, bsize);
843 cbytes = bsize;
844 }
845 else {
846 /* Regular compression */
847 cbytes = blosc_c(context, bsize, leftoverblock, ntbytes,
848 context->destsize, context->src+j*context->blocksize,
849 context->dest+ntbytes, tmp, tmp2);
850 if (cbytes == 0) {
851 ntbytes = 0; /* uncompressible data */
852 break;
853 }
854 }
855 }
856 else {
857 if (*(context->header_flags) & BLOSC_MEMCPYED) {
858 /* We want to memcpy only */
859 blosc_internal_fastcopy(context->dest + j * context->blocksize,
860 context->src + BLOSC_MAX_OVERHEAD + j * context->blocksize, bsize);
861 cbytes = bsize;
862 }
863 else {
864 /* Regular decompression */
865 cbytes = blosc_d(context, bsize, leftoverblock, context->src,
866 sw32_(context->bstarts + j * 4),
867 context->dest + j * context->blocksize, tmp, tmp2);
868 }
869 }
870 if (cbytes < 0) {
871 ntbytes = cbytes; /* error in blosc_c or blosc_d */
872 break;
873 }
874 ntbytes += cbytes;
875 }
876
877 /* Free temporaries */
878 my_free(tmp);
879
880 return ntbytes;
881 }
882
883
884 /* Threaded version for compression/decompression */
parallel_blosc(struct blosc_context * context)885 static int parallel_blosc(struct blosc_context* context)
886 {
887 int rc;
888 (void)rc; // just to avoid 'unused-variable' warning
889
890 /* Check whether we need to restart threads */
891 blosc_set_nthreads_(context);
892
893 /* Set sentinels */
894 context->thread_giveup_code = 1;
895 context->thread_nblock = -1;
896
897 /* Synchronization point for all threads (wait for initialization) */
898 WAIT_INIT(-1, context);
899
900 /* Synchronization point for all threads (wait for finalization) */
901 WAIT_FINISH(-1, context);
902
903 if (context->thread_giveup_code > 0) {
904 /* Return the total bytes (de-)compressed in threads */
905 return context->num_output_bytes;
906 }
907 else {
908 /* Compression/decompression gave up. Return error code. */
909 return context->thread_giveup_code;
910 }
911 }
912
913
914 /* Do the compression or decompression of the buffer depending on the
915 global params. */
do_job(struct blosc_context * context)916 static int do_job(struct blosc_context* context)
917 {
918 int32_t ntbytes;
919
920 /* Run the serial version when nthreads is 1 or when the buffers are
921 not much larger than blocksize */
922 if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
923 ntbytes = serial_blosc(context);
924 }
925 else {
926 ntbytes = parallel_blosc(context);
927 }
928
929 return ntbytes;
930 }
931
932
933 /* Whether a codec is meant for High Compression Ratios */
934 #define HCR(codec) ( \
935 ((codec) == BLOSC_LZ4HC) || \
936 ((codec) == BLOSC_ZLIB) || \
937 ((codec) == BLOSC_ZSTD) ? 1 : 0 )
938
939
940 /* Conditions for splitting a block before compressing with a codec. */
split_block(int compcode,int typesize,int blocksize)941 static int split_block(int compcode, int typesize, int blocksize) {
942 int splitblock = -1;
943
944 switch (g_splitmode) {
945 case BLOSC_ALWAYS_SPLIT:
946 splitblock = 1;
947 break;
948 case BLOSC_NEVER_SPLIT:
949 splitblock = 0;
950 break;
951 case BLOSC_AUTO_SPLIT:
952 /* Normally all the compressors designed for speed benefit from a
953 split. However, in conducted benchmarks LZ4 seems that it runs
954 faster if we don't split, which is quite surprising. */
955 splitblock= (((compcode == BLOSC_BLOSCLZ) ||
956 (compcode == BLOSC_SNAPPY)) &&
957 (typesize <= MAX_SPLITS) &&
958 (blocksize / typesize) >= MIN_BUFFERSIZE);
959 break;
960 case BLOSC_FORWARD_COMPAT_SPLIT:
961 /* The zstd support was introduced at the same time than the split flag, so
962 * there should be not a problem with not splitting bloscks with it */
963 splitblock = ((compcode != BLOSC_ZSTD) &&
964 (typesize <= MAX_SPLITS) &&
965 (blocksize / typesize) >= MIN_BUFFERSIZE);
966 break;
967 default:
968 fprintf(stderr, "Split mode %d not supported", g_splitmode);
969 }
970 return splitblock;
971 }
972
973
compute_blocksize(struct blosc_context * context,int32_t clevel,int32_t typesize,int32_t nbytes,int32_t forced_blocksize)974 static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel,
975 int32_t typesize, int32_t nbytes,
976 int32_t forced_blocksize)
977 {
978 int32_t blocksize;
979
980 /* Protection against very small buffers */
981 if (nbytes < (int32_t)typesize) {
982 return 1;
983 }
984
985 blocksize = nbytes; /* Start by a whole buffer as blocksize */
986
987 if (forced_blocksize) {
988 blocksize = forced_blocksize;
989 /* Check that forced blocksize is not too small */
990 if (blocksize < MIN_BUFFERSIZE) {
991 blocksize = MIN_BUFFERSIZE;
992 }
993 /* Check that forced blocksize is not too large */
994 if (blocksize > BLOSC_MAX_BLOCKSIZE) {
995 blocksize = BLOSC_MAX_BLOCKSIZE;
996 }
997 }
998 else if (nbytes >= L1) {
999 blocksize = L1;
1000
1001 /* For HCR codecs, increase the block sizes by a factor of 2 because they
1002 are meant for compressing large blocks (i.e. they show a big overhead
1003 when compressing small ones). */
1004 if (HCR(context->compcode)) {
1005 blocksize *= 2;
1006 }
1007
1008 switch (clevel) {
1009 case 0:
1010 /* Case of plain copy */
1011 blocksize /= 4;
1012 break;
1013 case 1:
1014 blocksize /= 2;
1015 break;
1016 case 2:
1017 blocksize *= 1;
1018 break;
1019 case 3:
1020 blocksize *= 2;
1021 break;
1022 case 4:
1023 case 5:
1024 blocksize *= 4;
1025 break;
1026 case 6:
1027 case 7:
1028 case 8:
1029 blocksize *= 8;
1030 break;
1031 case 9:
1032 blocksize *= 8;
1033 if (HCR(context->compcode)) {
1034 blocksize *= 2;
1035 }
1036 break;
1037 default:
1038 assert(0);
1039 break;
1040 }
1041 }
1042
1043 /* Enlarge the blocksize for splittable codecs */
1044 if (clevel > 0 && split_block(context->compcode, typesize, blocksize)) {
1045 if (blocksize > (1 << 16)) {
1046 /* Do not use a too large split buffer (> 64 KB) for splitting codecs */
1047 blocksize = (1 << 16);
1048 }
1049 blocksize *= typesize;
1050 if (blocksize < (1 << 16)) {
1051 /* Do not use a too small blocksize (< 64 KB) when typesize is small */
1052 blocksize = (1 << 16);
1053 }
1054 }
1055
1056 /* Check that blocksize is not too large */
1057 if (blocksize > (int32_t)nbytes) {
1058 blocksize = nbytes;
1059 }
1060
1061 /* blocksize *must absolutely* be a multiple of the typesize */
1062 if (blocksize > typesize) {
1063 blocksize = blocksize / typesize * typesize;
1064 }
1065
1066 return blocksize;
1067 }
1068
initialize_context_compression(struct blosc_context * context,int clevel,int doshuffle,size_t typesize,size_t sourcesize,const void * src,void * dest,size_t destsize,int32_t compressor,int32_t blocksize,int32_t numthreads)1069 static int initialize_context_compression(struct blosc_context* context,
1070 int clevel,
1071 int doshuffle,
1072 size_t typesize,
1073 size_t sourcesize,
1074 const void* src,
1075 void* dest,
1076 size_t destsize,
1077 int32_t compressor,
1078 int32_t blocksize,
1079 int32_t numthreads)
1080 {
1081 /* Set parameters */
1082 context->compress = 1;
1083 context->src = (const uint8_t*)src;
1084 context->dest = (uint8_t *)(dest);
1085 context->num_output_bytes = 0;
1086 context->destsize = (int32_t)destsize;
1087 context->sourcesize = sourcesize;
1088 context->typesize = typesize;
1089 context->compcode = compressor;
1090 context->numthreads = numthreads;
1091 context->end_threads = 0;
1092 context->clevel = clevel;
1093
1094 /* Check buffer size limits */
1095 if (sourcesize > BLOSC_MAX_BUFFERSIZE) {
1096 fprintf(stderr, "Input buffer size cannot exceed %d bytes\n",
1097 BLOSC_MAX_BUFFERSIZE);
1098 return -1;
1099 }
1100 if (destsize < BLOSC_MAX_OVERHEAD) {
1101 fprintf(stderr, "Output buffer size should be larger than %d bytes\n",
1102 BLOSC_MAX_OVERHEAD);
1103 return -1;
1104 }
1105
1106 /* Compression level */
1107 if (clevel < 0 || clevel > 9) {
1108 /* If clevel not in 0..9, print an error */
1109 fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
1110 return -10;
1111 }
1112
1113 /* Shuffle */
1114 if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) {
1115 fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n");
1116 return -10;
1117 }
1118
1119 /* Check typesize limits */
1120 if (context->typesize > BLOSC_MAX_TYPESIZE) {
1121 /* If typesize is too large, treat buffer as an 1-byte stream. */
1122 context->typesize = 1;
1123 }
1124
1125 /* Get the blocksize */
1126 context->blocksize = compute_blocksize(context, clevel, (int32_t)context->typesize, context->sourcesize, blocksize);
1127
1128 /* Compute number of blocks in buffer */
1129 context->nblocks = context->sourcesize / context->blocksize;
1130 context->leftover = context->sourcesize % context->blocksize;
1131 context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks;
1132
1133 return 1;
1134 }
1135
1136
write_compression_header(struct blosc_context * context,int clevel,int doshuffle)1137 static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle)
1138 {
1139 int32_t compformat;
1140 int dont_split;
1141
1142 /* Write version header for this block */
1143 context->dest[0] = BLOSC_VERSION_FORMAT; /* blosc format version */
1144
1145 /* Write compressor format */
1146 compformat = -1;
1147 switch (context->compcode)
1148 {
1149 case BLOSC_BLOSCLZ:
1150 compformat = BLOSC_BLOSCLZ_FORMAT;
1151 context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */
1152 break;
1153
1154 #if defined(HAVE_LZ4)
1155 case BLOSC_LZ4:
1156 compformat = BLOSC_LZ4_FORMAT;
1157 context->dest[1] = BLOSC_LZ4_VERSION_FORMAT; /* lz4 format version */
1158 break;
1159 case BLOSC_LZ4HC:
1160 compformat = BLOSC_LZ4HC_FORMAT;
1161 context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */
1162 break;
1163 #endif /* HAVE_LZ4 */
1164
1165 #if defined(HAVE_SNAPPY)
1166 case BLOSC_SNAPPY:
1167 compformat = BLOSC_SNAPPY_FORMAT;
1168 context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT; /* snappy format version */
1169 break;
1170 #endif /* HAVE_SNAPPY */
1171
1172 #if defined(HAVE_ZLIB)
1173 case BLOSC_ZLIB:
1174 compformat = BLOSC_ZLIB_FORMAT;
1175 context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT; /* zlib format version */
1176 break;
1177 #endif /* HAVE_ZLIB */
1178
1179 #if defined(HAVE_ZSTD)
1180 case BLOSC_ZSTD:
1181 compformat = BLOSC_ZSTD_FORMAT;
1182 context->dest[1] = BLOSC_ZSTD_VERSION_FORMAT; /* zstd format version */
1183 break;
1184 #endif /* HAVE_ZSTD */
1185
1186 default:
1187 {
1188 const char *compname;
1189 compname = clibcode_to_clibname(compformat);
1190 fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
1191 fprintf(stderr, "compression support. Please use one having it.");
1192 return -5; /* signals no compression support */
1193 break;
1194 }
1195 }
1196
1197 context->header_flags = context->dest+2; /* flags */
1198 context->dest[2] = 0; /* zeroes flags */
1199 context->dest[3] = (uint8_t)context->typesize; /* type size */
1200 _sw32(context->dest + 4, context->sourcesize); /* size of the buffer */
1201 _sw32(context->dest + 8, context->blocksize); /* block size */
1202 context->bstarts = context->dest + 16; /* starts for every block */
1203 context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks; /* space for header and pointers */
1204
1205 if (context->clevel == 0) {
1206 /* Compression level 0 means buffer to be memcpy'ed */
1207 *(context->header_flags) |= BLOSC_MEMCPYED;
1208 context->num_output_bytes = 16; /* space just for header */
1209 }
1210
1211 if (context->sourcesize < MIN_BUFFERSIZE) {
1212 /* Buffer is too small. Try memcpy'ing. */
1213 *(context->header_flags) |= BLOSC_MEMCPYED;
1214 context->num_output_bytes = 16; /* space just for header */
1215 }
1216
1217 if (doshuffle == BLOSC_SHUFFLE) {
1218 /* Byte-shuffle is active */
1219 *(context->header_flags) |= BLOSC_DOSHUFFLE; /* bit 0 set to one in flags */
1220 }
1221
1222 if (doshuffle == BLOSC_BITSHUFFLE) {
1223 /* Bit-shuffle is active */
1224 *(context->header_flags) |= BLOSC_DOBITSHUFFLE; /* bit 2 set to one in flags */
1225 }
1226
1227 dont_split = !split_block(context->compcode, context->typesize,
1228 context->blocksize);
1229 *(context->header_flags) |= dont_split << 4; /* dont_split is in bit 4 */
1230 *(context->header_flags) |= compformat << 5; /* compressor format starts at bit 5 */
1231
1232 return 1;
1233 }
1234
1235
blosc_compress_context(struct blosc_context * context)1236 int blosc_compress_context(struct blosc_context* context)
1237 {
1238 int32_t ntbytes = 0;
1239
1240 if ((*(context->header_flags) & BLOSC_MEMCPYED) &&
1241 (context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize)) {
1242 return 0; /* data cannot be copied without overrun destination */
1243 }
1244
1245 /* Do the actual compression */
1246 ntbytes = do_job(context);
1247 if (ntbytes < 0) {
1248 return -1;
1249 }
1250 if ((ntbytes == 0) && (context->sourcesize + BLOSC_MAX_OVERHEAD <= context->destsize)) {
1251 /* Last chance for fitting `src` buffer in `dest`. Update flags and force a copy. */
1252 *(context->header_flags) |= BLOSC_MEMCPYED;
1253 context->num_output_bytes = BLOSC_MAX_OVERHEAD; /* reset the output bytes in previous step */
1254 ntbytes = do_job(context);
1255 if (ntbytes < 0) {
1256 return -1;
1257 }
1258 }
1259
1260 /* Set the number of compressed bytes in header */
1261 _sw32(context->dest + 12, ntbytes);
1262
1263 assert(ntbytes <= context->destsize);
1264 return ntbytes;
1265 }
1266
1267 /* The public routine for compression with context. */
blosc_compress_ctx(int clevel,int doshuffle,size_t typesize,size_t nbytes,const void * src,void * dest,size_t destsize,const char * compressor,size_t blocksize,int numinternalthreads)1268 int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
1269 size_t nbytes, const void* src, void* dest,
1270 size_t destsize, const char* compressor,
1271 size_t blocksize, int numinternalthreads)
1272 {
1273 int error, result;
1274 struct blosc_context context;
1275
1276 context.threads_started = 0;
1277 error = initialize_context_compression(&context, clevel, doshuffle, typesize,
1278 nbytes, src, dest, destsize,
1279 blosc_compname_to_compcode(compressor),
1280 blocksize, numinternalthreads);
1281 if (error < 0) { return error; }
1282
1283 error = write_compression_header(&context, clevel, doshuffle);
1284 if (error < 0) { return error; }
1285
1286 result = blosc_compress_context(&context);
1287
1288 if (numinternalthreads > 1)
1289 {
1290 blosc_release_threadpool(&context);
1291 }
1292
1293 return result;
1294 }
1295
1296 /* The public routine for compression. See blosc.h for docstrings. */
blosc_compress(int clevel,int doshuffle,size_t typesize,size_t nbytes,const void * src,void * dest,size_t destsize)1297 int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
1298 const void *src, void *dest, size_t destsize)
1299 {
1300 int result;
1301 char* envvar;
1302
1303 /* Check if should initialize */
1304 if (!g_initlib) blosc_init();
1305
1306 /* Check for environment variables */
1307 envvar = getenv("BLOSC_CLEVEL");
1308 if (envvar != NULL) {
1309 long value;
1310 value = strtol(envvar, NULL, 10);
1311 if ((value != EINVAL) && (value >= 0)) {
1312 clevel = (int)value;
1313 }
1314 }
1315
1316 envvar = getenv("BLOSC_SHUFFLE");
1317 if (envvar != NULL) {
1318 if (strcmp(envvar, "NOSHUFFLE") == 0) {
1319 doshuffle = BLOSC_NOSHUFFLE;
1320 }
1321 if (strcmp(envvar, "SHUFFLE") == 0) {
1322 doshuffle = BLOSC_SHUFFLE;
1323 }
1324 if (strcmp(envvar, "BITSHUFFLE") == 0) {
1325 doshuffle = BLOSC_BITSHUFFLE;
1326 }
1327 }
1328
1329 envvar = getenv("BLOSC_TYPESIZE");
1330 if (envvar != NULL) {
1331 long value;
1332 value = strtol(envvar, NULL, 10);
1333 if ((value != EINVAL) && (value > 0)) {
1334 typesize = (int)value;
1335 }
1336 }
1337
1338 envvar = getenv("BLOSC_COMPRESSOR");
1339 if (envvar != NULL) {
1340 result = blosc_set_compressor(envvar);
1341 if (result < 0) { return result; }
1342 }
1343
1344 envvar = getenv("BLOSC_BLOCKSIZE");
1345 if (envvar != NULL) {
1346 long blocksize;
1347 blocksize = strtol(envvar, NULL, 10);
1348 if ((blocksize != EINVAL) && (blocksize > 0)) {
1349 blosc_set_blocksize((size_t)blocksize);
1350 }
1351 }
1352
1353 envvar = getenv("BLOSC_NTHREADS");
1354 if (envvar != NULL) {
1355 long nthreads;
1356 nthreads = strtol(envvar, NULL, 10);
1357 if ((nthreads != EINVAL) && (nthreads > 0)) {
1358 result = blosc_set_nthreads((int)nthreads);
1359 if (result < 0) { return result; }
1360 }
1361 }
1362
1363 envvar = getenv("BLOSC_SPLITMODE");
1364 if (envvar != NULL) {
1365 if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
1366 blosc_set_splitmode(BLOSC_FORWARD_COMPAT_SPLIT);
1367 }
1368 else if (strcmp(envvar, "AUTO") == 0) {
1369 blosc_set_splitmode(BLOSC_AUTO_SPLIT);
1370 }
1371 else if (strcmp(envvar, "ALWAYS") == 0) {
1372 blosc_set_splitmode(BLOSC_ALWAYS_SPLIT);
1373 }
1374 else if (strcmp(envvar, "NEVER") == 0) {
1375 blosc_set_splitmode(BLOSC_NEVER_SPLIT);
1376 }
1377 else {
1378 fprintf(stderr, "BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
1379 return -1;
1380 }
1381 }
1382
1383 /* Check for a BLOSC_NOLOCK environment variable. It is important
1384 that this should be the last env var so that it can take the
1385 previous ones into account */
1386 envvar = getenv("BLOSC_NOLOCK");
1387 if (envvar != NULL) {
1388 const char *compname;
1389 blosc_compcode_to_compname(g_compressor, &compname);
1390 result = blosc_compress_ctx(clevel, doshuffle, typesize,
1391 nbytes, src, dest, destsize,
1392 compname, g_force_blocksize, g_threads);
1393 return result;
1394 }
1395
1396 pthread_mutex_lock(global_comp_mutex);
1397
1398 do {
1399 result = initialize_context_compression(g_global_context, clevel, doshuffle,
1400 typesize, nbytes, src, dest, destsize,
1401 g_compressor, g_force_blocksize,
1402 g_threads);
1403 if (result < 0) { break; }
1404
1405 result = write_compression_header(g_global_context, clevel, doshuffle);
1406 if (result < 0) { break; }
1407
1408 result = blosc_compress_context(g_global_context);
1409 } while (0);
1410
1411 pthread_mutex_unlock(global_comp_mutex);
1412
1413 return result;
1414 }
1415
blosc_run_decompression_with_context(struct blosc_context * context,const void * src,void * dest,size_t destsize,int numinternalthreads,int unsafe)1416 static int blosc_run_decompression_with_context(struct blosc_context* context,
1417 const void* src,
1418 void* dest,
1419 size_t destsize,
1420 int numinternalthreads,
1421 int unsafe)
1422 {
1423 uint8_t version;
1424 int32_t ntbytes;
1425
1426 context->compress = 0;
1427 context->src = (const uint8_t*)src;
1428 context->dest = (uint8_t*)dest;
1429 context->destsize = destsize;
1430 context->num_output_bytes = 0;
1431 context->numthreads = numinternalthreads;
1432 context->end_threads = 0;
1433
1434 /* Read the header block */
1435 version = context->src[0]; /* blosc format version */
1436 context->compversion = context->src[1];
1437
1438 context->header_flags = (uint8_t*)(context->src + 2); /* flags */
1439 context->typesize = (int32_t)context->src[3]; /* typesize */
1440 context->sourcesize = sw32_(context->src + 4); /* buffer size */
1441 context->blocksize = sw32_(context->src + 8); /* block size */
1442 context->compressedsize = sw32_(context->src + 12); /* compressed buffer size */
1443 context->bstarts = (uint8_t*)(context->src + 16);
1444
1445 if (context->sourcesize == 0) {
1446 /* Source buffer was empty, so we are done */
1447 return 0;
1448 }
1449
1450 if (context->blocksize <= 0 || context->blocksize > destsize ||
1451 context->blocksize > BLOSC_MAX_BLOCKSIZE || context->typesize <= 0 ||
1452 context->typesize > BLOSC_MAX_TYPESIZE) {
1453 return -1;
1454 }
1455
1456 if (version != BLOSC_VERSION_FORMAT) {
1457 /* Version from future */
1458 return -1;
1459 }
1460 if (*context->header_flags & 0x08) {
1461 /* compressor flags from the future */
1462 return -1;
1463 }
1464
1465 /* Compute some params */
1466 /* Total blocks */
1467 context->nblocks = context->sourcesize / context->blocksize;
1468 context->leftover = context->sourcesize % context->blocksize;
1469 context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks;
1470
1471 /* Check that we have enough space to decompress */
1472 if (context->sourcesize > (int32_t)destsize) {
1473 return -1;
1474 }
1475
1476 if (*(context->header_flags) & BLOSC_MEMCPYED) {
1477 /* Validate that compressed size is equal to decompressed size + header
1478 size. */
1479 if (context->sourcesize + BLOSC_MAX_OVERHEAD != context->compressedsize) {
1480 return -1;
1481 }
1482 } else {
1483 ntbytes = initialize_decompress_func(context, unsafe);
1484 if (ntbytes != 0) return ntbytes;
1485
1486 /* Validate that compressed size is large enough to hold the bstarts array */
1487 if (context->nblocks > (context->compressedsize - 16) / 4) {
1488 return -1;
1489 }
1490 }
1491
1492 /* Do the actual decompression */
1493 ntbytes = do_job(context);
1494 if (ntbytes < 0) {
1495 return -1;
1496 }
1497
1498 assert(ntbytes <= (int32_t)destsize);
1499 return ntbytes;
1500 }
1501
1502 /* Implementation of blosc_decompress_ctx{,_unsafe}. */
blosc_decompress_ctx_impl(const void * src,void * dest,size_t destsize,int numinternalthreads,int unsafe)1503 static int blosc_decompress_ctx_impl(const void* src, void* dest,
1504 size_t destsize, int numinternalthreads,
1505 int unsafe) {
1506 int result;
1507 struct blosc_context context;
1508
1509 context.threads_started = 0;
1510 result = blosc_run_decompression_with_context(&context, src, dest, destsize,
1511 numinternalthreads, unsafe);
1512
1513 if (numinternalthreads > 1)
1514 {
1515 blosc_release_threadpool(&context);
1516 }
1517
1518 return result;
1519 }
1520
blosc_decompress_ctx(const void * src,void * dest,size_t destsize,int numinternalthreads)1521 int blosc_decompress_ctx(const void* src, void* dest, size_t destsize,
1522 int numinternalthreads) {
1523 return blosc_decompress_ctx_impl(src, dest, destsize, numinternalthreads,
1524 /*unsafe=*/0);
1525 }
1526
blosc_decompress_ctx_unsafe(const void * src,void * dest,size_t destsize,int numinternalthreads)1527 int blosc_decompress_ctx_unsafe(const void* src, void* dest, size_t destsize,
1528 int numinternalthreads) {
1529 return blosc_decompress_ctx_impl(src, dest, destsize, numinternalthreads,
1530 /*unsafe=*/1);
1531 }
1532
1533 /* Implementation of blosc_decompress{,_unsafe}. */
blosc_decompress_impl(const void * src,void * dest,size_t destsize,int unsafe)1534 static int blosc_decompress_impl(const void* src, void* dest,
1535 size_t destsize, int unsafe) {
1536 int result;
1537 char* envvar;
1538 long nthreads;
1539
1540 /* Check if should initialize */
1541 if (!g_initlib) blosc_init();
1542
1543 /* Check for a BLOSC_NTHREADS environment variable */
1544 envvar = getenv("BLOSC_NTHREADS");
1545 if (envvar != NULL) {
1546 nthreads = strtol(envvar, NULL, 10);
1547 if ((nthreads != EINVAL) && (nthreads > 0)) {
1548 result = blosc_set_nthreads((int)nthreads);
1549 if (result < 0) { return result; }
1550 }
1551 }
1552
1553 /* Check for a BLOSC_NOLOCK environment variable. It is important
1554 that this should be the last env var so that it can take the
1555 previous ones into account */
1556 envvar = getenv("BLOSC_NOLOCK");
1557 if (envvar != NULL) {
1558 result = blosc_decompress_ctx(src, dest, destsize, g_threads);
1559 return result;
1560 }
1561
1562 pthread_mutex_lock(global_comp_mutex);
1563
1564 result = blosc_run_decompression_with_context(g_global_context, src, dest,
1565 destsize, g_threads, unsafe);
1566
1567 pthread_mutex_unlock(global_comp_mutex);
1568
1569 return result;
1570 }
1571
1572 /* The public routine for decompression. See blosc.h for docstrings. */
blosc_decompress(const void * src,void * dest,size_t destsize)1573 int blosc_decompress(const void *src, void *dest, size_t destsize) {
1574 return blosc_decompress_impl(src, dest, destsize, /*unsafe=*/0);
1575 }
1576
blosc_decompress_unsafe(const void * src,void * dest,size_t destsize)1577 int blosc_decompress_unsafe(const void *src, void *dest, size_t destsize) {
1578 return blosc_decompress_impl(src, dest, destsize, /*unsafe=*/1);
1579 }
1580
1581
1582 /* Implementation of blosc_getitem{,_unsafe}. */
blosc_getitem_impl(const void * src,int start,int nitems,void * dest,int unsafe)1583 static int blosc_getitem_impl(const void* src, int start, int nitems,
1584 void* dest, int unsafe) {
1585 uint8_t *_src=NULL; /* current pos for source buffer */
1586 uint8_t version, compversion; /* versions for compressed header */
1587 uint8_t flags; /* flags for header */
1588 int32_t ntbytes = 0; /* the number of uncompressed bytes */
1589 int32_t nblocks; /* number of total blocks in buffer */
1590 int32_t leftover; /* extra bytes at end of buffer */
1591 uint8_t *bstarts; /* start pointers for each block */
1592 int32_t typesize, blocksize, nbytes, compressedsize;
1593 int32_t j, bsize, bsize2, leftoverblock;
1594 int32_t cbytes, startb, stopb;
1595 int stop = start + nitems;
1596 uint8_t *tmp;
1597 uint8_t *tmp2;
1598 uint8_t *tmp3;
1599 int32_t ebsize;
1600 struct blosc_context context = {0};
1601
1602 _src = (uint8_t *)(src);
1603
1604 /* Read the header block */
1605 version = _src[0]; /* blosc format version */
1606 compversion = _src[1];
1607 flags = _src[2]; /* flags */
1608 typesize = (int32_t)_src[3]; /* typesize */
1609 nbytes = sw32_(_src + 4); /* buffer size */
1610 blocksize = sw32_(_src + 8); /* block size */
1611 compressedsize = sw32_(_src + 12); /* compressed buffer size */
1612
1613 if (version != BLOSC_VERSION_FORMAT)
1614 return -9;
1615
1616 if (blocksize <= 0 || blocksize > nbytes || blocksize > BLOSC_MAX_BLOCKSIZE ||
1617 typesize <= 0 || typesize > BLOSC_MAX_TYPESIZE) {
1618 return -1;
1619 }
1620
1621 /* Compute some params */
1622 /* Total blocks */
1623 nblocks = nbytes / blocksize;
1624 leftover = nbytes % blocksize;
1625 nblocks = (leftover>0)? nblocks+1: nblocks;
1626
1627 /* Only initialize the fields blosc_d uses */
1628 context.typesize = typesize;
1629 context.header_flags = &flags;
1630 context.compversion = compversion;
1631 context.compressedsize = compressedsize;
1632 if (flags & BLOSC_MEMCPYED) {
1633 if (nbytes + BLOSC_MAX_OVERHEAD != compressedsize) {
1634 return -1;
1635 }
1636 } else {
1637 ntbytes = initialize_decompress_func(&context, /*unsafe=*/unsafe);
1638 if (ntbytes != 0) return ntbytes;
1639
1640 if (nblocks >= (compressedsize - 16) / 4) {
1641 return -1;
1642 }
1643 }
1644
1645 ebsize = blocksize + typesize * (int32_t)sizeof(int32_t);
1646 tmp = my_malloc(blocksize + ebsize + blocksize);
1647 tmp2 = tmp + blocksize;
1648 tmp3 = tmp + blocksize + ebsize;
1649
1650 _src += 16;
1651 bstarts = _src;
1652 _src += sizeof(int32_t)*nblocks;
1653
1654 /* Check region boundaries */
1655 if ((start < 0) || (start*typesize > nbytes)) {
1656 fprintf(stderr, "`start` out of bounds");
1657 return -1;
1658 }
1659
1660 if ((stop < 0) || (stop*typesize > nbytes)) {
1661 fprintf(stderr, "`start`+`nitems` out of bounds");
1662 return -1;
1663 }
1664
1665 for (j = 0; j < nblocks; j++) {
1666 bsize = blocksize;
1667 leftoverblock = 0;
1668 if ((j == nblocks - 1) && (leftover > 0)) {
1669 bsize = leftover;
1670 leftoverblock = 1;
1671 }
1672
1673 /* Compute start & stop for each block */
1674 startb = start * typesize - j * blocksize;
1675 stopb = stop * typesize - j * blocksize;
1676 if ((startb >= (int)blocksize) || (stopb <= 0)) {
1677 continue;
1678 }
1679 if (startb < 0) {
1680 startb = 0;
1681 }
1682 if (stopb > (int)blocksize) {
1683 stopb = blocksize;
1684 }
1685 bsize2 = stopb - startb;
1686
1687 /* Do the actual data copy */
1688 if (flags & BLOSC_MEMCPYED) {
1689 /* We want to memcpy only */
1690 blosc_internal_fastcopy((uint8_t *) dest + ntbytes,
1691 (uint8_t *) src + BLOSC_MAX_OVERHEAD + j * blocksize + startb, bsize2);
1692 cbytes = bsize2;
1693 }
1694 else {
1695 /* Regular decompression. Put results in tmp2. */
1696 cbytes = blosc_d(&context, bsize, leftoverblock,
1697 (uint8_t *)src, sw32_(bstarts + j * 4),
1698 tmp2, tmp, tmp3);
1699 if (cbytes < 0) {
1700 ntbytes = cbytes;
1701 break;
1702 }
1703 /* Copy to destination */
1704 blosc_internal_fastcopy((uint8_t *) dest + ntbytes, tmp2 + startb, bsize2);
1705 cbytes = bsize2;
1706 }
1707 ntbytes += cbytes;
1708 }
1709
1710 my_free(tmp);
1711
1712 return ntbytes;
1713 }
1714
1715 /* Specific routine optimized for decompression a small number of
1716 items out of a compressed chunk. This does not use threads because
1717 it would affect negatively to performance. */
blosc_getitem(const void * src,int start,int nitems,void * dest)1718 int blosc_getitem(const void *src, int start, int nitems, void *dest) {
1719 return blosc_getitem_impl(src, start, nitems, dest, /*unsafe=*/0);
1720 }
1721
blosc_getitem_unsafe(const void * src,int start,int nitems,void * dest)1722 int blosc_getitem_unsafe(const void *src, int start, int nitems, void *dest) {
1723 return blosc_getitem_impl(src, start, nitems, dest, /*unsafe=*/1);
1724 }
1725
1726 /* Decompress & unshuffle several blocks in a single thread */
t_blosc(void * ctxt)1727 static void *t_blosc(void *ctxt)
1728 {
1729 struct thread_context* context = (struct thread_context*)ctxt;
1730 int32_t cbytes, ntdest;
1731 int32_t tblocks; /* number of blocks per thread */
1732 int32_t leftover2;
1733 int32_t tblock; /* limit block on a thread */
1734 int32_t nblock_; /* private copy of nblock */
1735 int32_t bsize, leftoverblock;
1736 /* Parameters for threads */
1737 int32_t blocksize;
1738 int32_t ebsize;
1739 int32_t compress;
1740 int32_t maxbytes;
1741 int32_t ntbytes;
1742 int32_t flags;
1743 int32_t nblocks;
1744 int32_t leftover;
1745 uint8_t *bstarts;
1746 const uint8_t *src;
1747 uint8_t *dest;
1748 uint8_t *tmp;
1749 uint8_t *tmp2;
1750 uint8_t *tmp3;
1751 int rc;
1752 (void)rc; // just to avoid 'unused-variable' warning
1753
1754 while(1)
1755 {
1756 /* Synchronization point for all threads (wait for initialization) */
1757 WAIT_INIT(NULL, context->parent_context);
1758
1759 if(context->parent_context->end_threads)
1760 {
1761 break;
1762 }
1763
1764 /* Get parameters for this thread before entering the main loop */
1765 blocksize = context->parent_context->blocksize;
1766 ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t);
1767 compress = context->parent_context->compress;
1768 flags = *(context->parent_context->header_flags);
1769 maxbytes = context->parent_context->destsize;
1770 nblocks = context->parent_context->nblocks;
1771 leftover = context->parent_context->leftover;
1772 bstarts = context->parent_context->bstarts;
1773 src = context->parent_context->src;
1774 dest = context->parent_context->dest;
1775
1776 if (blocksize > context->tmpblocksize)
1777 {
1778 my_free(context->tmp);
1779 context->tmp = my_malloc(blocksize + ebsize + blocksize);
1780 context->tmp2 = context->tmp + blocksize;
1781 context->tmp3 = context->tmp + blocksize + ebsize;
1782 }
1783
1784 tmp = context->tmp;
1785 tmp2 = context->tmp2;
1786 tmp3 = context->tmp3;
1787
1788 ntbytes = 0; /* only useful for decompression */
1789
1790 if (compress && !(flags & BLOSC_MEMCPYED)) {
1791 /* Compression always has to follow the block order */
1792 pthread_mutex_lock(&context->parent_context->count_mutex);
1793 context->parent_context->thread_nblock++;
1794 nblock_ = context->parent_context->thread_nblock;
1795 pthread_mutex_unlock(&context->parent_context->count_mutex);
1796 tblock = nblocks;
1797 }
1798 else {
1799 /* Decompression can happen using any order. We choose
1800 sequential block order on each thread */
1801
1802 /* Blocks per thread */
1803 tblocks = nblocks / context->parent_context->numthreads;
1804 leftover2 = nblocks % context->parent_context->numthreads;
1805 tblocks = (leftover2>0)? tblocks+1: tblocks;
1806
1807 nblock_ = context->tid*tblocks;
1808 tblock = nblock_ + tblocks;
1809 if (tblock > nblocks) {
1810 tblock = nblocks;
1811 }
1812 }
1813
1814 /* Loop over blocks */
1815 leftoverblock = 0;
1816 while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) {
1817 bsize = blocksize;
1818 if (nblock_ == (nblocks - 1) && (leftover > 0)) {
1819 bsize = leftover;
1820 leftoverblock = 1;
1821 }
1822 if (compress) {
1823 if (flags & BLOSC_MEMCPYED) {
1824 /* We want to memcpy only */
1825 blosc_internal_fastcopy(dest + BLOSC_MAX_OVERHEAD + nblock_ * blocksize, src + nblock_ * blocksize,
1826 bsize);
1827 cbytes = bsize;
1828 }
1829 else {
1830 /* Regular compression */
1831 cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize,
1832 src+nblock_*blocksize, tmp2, tmp, tmp3);
1833 }
1834 }
1835 else {
1836 if (flags & BLOSC_MEMCPYED) {
1837 /* We want to memcpy only */
1838 blosc_internal_fastcopy(dest + nblock_ * blocksize, src + BLOSC_MAX_OVERHEAD + nblock_ * blocksize,
1839 bsize);
1840 cbytes = bsize;
1841 }
1842 else {
1843 cbytes = blosc_d(context->parent_context, bsize, leftoverblock,
1844 src, sw32_(bstarts + nblock_ * 4),
1845 dest+nblock_*blocksize,
1846 tmp, tmp2);
1847 }
1848 }
1849
1850 /* Check whether current thread has to giveup */
1851 if (context->parent_context->thread_giveup_code <= 0) {
1852 break;
1853 }
1854
1855 /* Check results for the compressed/decompressed block */
1856 if (cbytes < 0) { /* compr/decompr failure */
1857 /* Set giveup_code error */
1858 pthread_mutex_lock(&context->parent_context->count_mutex);
1859 context->parent_context->thread_giveup_code = cbytes;
1860 pthread_mutex_unlock(&context->parent_context->count_mutex);
1861 break;
1862 }
1863
1864 if (compress && !(flags & BLOSC_MEMCPYED)) {
1865 /* Start critical section */
1866 pthread_mutex_lock(&context->parent_context->count_mutex);
1867 ntdest = context->parent_context->num_output_bytes;
1868 _sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */
1869 if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) {
1870 context->parent_context->thread_giveup_code = 0; /* uncompressible buffer */
1871 pthread_mutex_unlock(&context->parent_context->count_mutex);
1872 break;
1873 }
1874 context->parent_context->thread_nblock++;
1875 nblock_ = context->parent_context->thread_nblock;
1876 context->parent_context->num_output_bytes += cbytes; /* update return bytes counter */
1877 pthread_mutex_unlock(&context->parent_context->count_mutex);
1878 /* End of critical section */
1879
1880 /* Copy the compressed buffer to destination */
1881 blosc_internal_fastcopy(dest + ntdest, tmp2, cbytes);
1882 }
1883 else {
1884 nblock_++;
1885 /* Update counter for this thread */
1886 ntbytes += cbytes;
1887 }
1888
1889 } /* closes while (nblock_) */
1890
1891 /* Sum up all the bytes decompressed */
1892 if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) {
1893 /* Update global counter for all threads (decompression only) */
1894 pthread_mutex_lock(&context->parent_context->count_mutex);
1895 context->parent_context->num_output_bytes += ntbytes;
1896 pthread_mutex_unlock(&context->parent_context->count_mutex);
1897 }
1898
1899 /* Meeting point for all threads (wait for finalization) */
1900 WAIT_FINISH(NULL, context->parent_context);
1901 }
1902
1903 /* Cleanup our working space and context */
1904 my_free(context->tmp);
1905 my_free(context);
1906
1907 return(NULL);
1908 }
1909
1910
init_threads(struct blosc_context * context)1911 static int init_threads(struct blosc_context* context)
1912 {
1913 int32_t tid;
1914 int rc2;
1915 int32_t ebsize;
1916 struct thread_context* thread_context;
1917
1918 /* Initialize mutex and condition variable objects */
1919 pthread_mutex_init(&context->count_mutex, NULL);
1920
1921 /* Set context thread sentinels */
1922 context->thread_giveup_code = 1;
1923 context->thread_nblock = -1;
1924
1925 /* Barrier initialization */
1926 #ifdef _POSIX_BARRIERS_MINE
1927 pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1);
1928 pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1);
1929 #else
1930 pthread_mutex_init(&context->count_threads_mutex, NULL);
1931 pthread_cond_init(&context->count_threads_cv, NULL);
1932 context->count_threads = 0; /* Reset threads counter */
1933 #endif
1934
1935 #if !defined(_WIN32)
1936 /* Initialize and set thread detached attribute */
1937 pthread_attr_init(&context->ct_attr);
1938 pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
1939 #endif
1940
1941 /* Finally, create the threads in detached state */
1942 for (tid = 0; tid < context->numthreads; tid++) {
1943 context->tids[tid] = tid;
1944
1945 /* Create a thread context thread owns context (will destroy when finished) */
1946 thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
1947 thread_context->parent_context = context;
1948 thread_context->tid = tid;
1949
1950 ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
1951 thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize);
1952 thread_context->tmp2 = thread_context->tmp + context->blocksize;
1953 thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize;
1954 thread_context->tmpblocksize = context->blocksize;
1955
1956 #if !defined(_WIN32)
1957 rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context);
1958 #else
1959 rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context);
1960 #endif
1961 if (rc2) {
1962 fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc2);
1963 fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
1964 return(-1);
1965 }
1966 }
1967
1968
1969 return(0);
1970 }
1971
blosc_get_nthreads(void)1972 int blosc_get_nthreads(void)
1973 {
1974 int ret = g_threads;
1975
1976 return ret;
1977 }
1978
blosc_set_nthreads(int nthreads_new)1979 int blosc_set_nthreads(int nthreads_new)
1980 {
1981 int ret = g_threads;
1982
1983 /* Check if should initialize */
1984 if (!g_initlib) blosc_init();
1985
1986 if (nthreads_new != ret){
1987 /* Re-initialize Blosc */
1988 blosc_destroy();
1989 blosc_init();
1990 g_threads = nthreads_new;
1991 }
1992
1993 return ret;
1994 }
1995
blosc_set_nthreads_(struct blosc_context * context)1996 int blosc_set_nthreads_(struct blosc_context* context)
1997 {
1998 if (context->numthreads > BLOSC_MAX_THREADS) {
1999 fprintf(stderr,
2000 "Error. nthreads cannot be larger than BLOSC_MAX_THREADS (%d)",
2001 BLOSC_MAX_THREADS);
2002 return -1;
2003 }
2004 else if (context->numthreads <= 0) {
2005 fprintf(stderr, "Error. nthreads must be a positive integer");
2006 return -1;
2007 }
2008
2009 /* Launch a new pool of threads */
2010 if (context->numthreads > 1 && context->numthreads != context->threads_started) {
2011 blosc_release_threadpool(context);
2012 init_threads(context);
2013 }
2014
2015 /* We have now started the threads */
2016 context->threads_started = context->numthreads;
2017
2018 return context->numthreads;
2019 }
2020
blosc_get_compressor(void)2021 const char* blosc_get_compressor(void)
2022 {
2023 const char* compname;
2024 blosc_compcode_to_compname(g_compressor, &compname);
2025
2026 return compname;
2027 }
2028
blosc_set_compressor(const char * compname)2029 int blosc_set_compressor(const char *compname)
2030 {
2031 int code = blosc_compname_to_compcode(compname);
2032
2033 g_compressor = code;
2034
2035 /* Check if should initialize */
2036 if (!g_initlib) blosc_init();
2037
2038 return code;
2039 }
2040
blosc_list_compressors(void)2041 const char* blosc_list_compressors(void)
2042 {
2043 static int compressors_list_done = 0;
2044 static char ret[256];
2045
2046 if (compressors_list_done) return ret;
2047 ret[0] = '\0';
2048 strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
2049 #if defined(HAVE_LZ4)
2050 strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME);
2051 strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME);
2052 #endif /* HAVE_LZ4 */
2053 #if defined(HAVE_SNAPPY)
2054 strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME);
2055 #endif /* HAVE_SNAPPY */
2056 #if defined(HAVE_ZLIB)
2057 strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME);
2058 #endif /* HAVE_ZLIB */
2059 #if defined(HAVE_ZSTD)
2060 strcat(ret, ","); strcat(ret, BLOSC_ZSTD_COMPNAME);
2061 #endif /* HAVE_ZSTD */
2062 compressors_list_done = 1;
2063 return ret;
2064 }
2065
blosc_get_version_string(void)2066 const char* blosc_get_version_string(void)
2067 {
2068 return BLOSC_VERSION_STRING;
2069 }
2070
blosc_get_complib_info(const char * compname,char ** complib,char ** version)2071 int blosc_get_complib_info(const char *compname, char **complib, char **version)
2072 {
2073 int clibcode;
2074 const char *clibname;
2075 const char *clibversion = "unknown";
2076
2077 #if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION)) || defined(ZSTD_VERSION_MAJOR)
2078 char sbuffer[256];
2079 #endif
2080
2081 clibcode = compname_to_clibcode(compname);
2082 clibname = clibcode_to_clibname(clibcode);
2083
2084 /* complib version */
2085 if (clibcode == BLOSC_BLOSCLZ_LIB) {
2086 clibversion = BLOSCLZ_VERSION_STRING;
2087 }
2088 #if defined(HAVE_LZ4)
2089 else if (clibcode == BLOSC_LZ4_LIB) {
2090 #if defined(LZ4_VERSION_MAJOR)
2091 sprintf(sbuffer, "%d.%d.%d",
2092 LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
2093 clibversion = sbuffer;
2094 #endif /* LZ4_VERSION_MAJOR */
2095 }
2096 #endif /* HAVE_LZ4 */
2097 #if defined(HAVE_SNAPPY)
2098 else if (clibcode == BLOSC_SNAPPY_LIB) {
2099 #if defined(SNAPPY_VERSION)
2100 sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL);
2101 clibversion = sbuffer;
2102 #endif /* SNAPPY_VERSION */
2103 }
2104 #endif /* HAVE_SNAPPY */
2105 #if defined(HAVE_ZLIB)
2106 else if (clibcode == BLOSC_ZLIB_LIB) {
2107 clibversion = ZLIB_VERSION;
2108 }
2109 #endif /* HAVE_ZLIB */
2110 #if defined(HAVE_ZSTD)
2111 else if (clibcode == BLOSC_ZSTD_LIB) {
2112 sprintf(sbuffer, "%d.%d.%d",
2113 ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
2114 clibversion = sbuffer;
2115 }
2116 #endif /* HAVE_ZSTD */
2117 else {
2118 /* Unsupported library */
2119 if (complib != NULL) *complib = NULL;
2120 if (version != NULL) *version = NULL;
2121 return -1;
2122 }
2123
2124 if (complib != NULL) *complib = strdup(clibname);
2125 if (version != NULL) *version = strdup(clibversion);
2126
2127 return clibcode;
2128 }
2129
2130 /* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
blosc_cbuffer_sizes(const void * cbuffer,size_t * nbytes,size_t * cbytes,size_t * blocksize)2131 void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes,
2132 size_t *cbytes, size_t *blocksize)
2133 {
2134 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
2135 uint8_t version = _src[0]; /* version of header */
2136
2137 if (version != BLOSC_VERSION_FORMAT) {
2138 *nbytes = *blocksize = *cbytes = 0;
2139 return;
2140 }
2141
2142 /* Read the interesting values */
2143 *nbytes = (size_t)sw32_(_src + 4); /* uncompressed buffer size */
2144 *blocksize = (size_t)sw32_(_src + 8); /* block size */
2145 *cbytes = (size_t)sw32_(_src + 12); /* compressed buffer size */
2146 }
2147
blosc_cbuffer_validate(const void * cbuffer,size_t cbytes,size_t * nbytes)2148 int blosc_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
2149 size_t header_cbytes, header_blocksize;
2150 if (cbytes < BLOSC_MIN_HEADER_LENGTH) return -1;
2151 blosc_cbuffer_sizes(cbuffer, nbytes, &header_cbytes, &header_blocksize);
2152 if (header_cbytes != cbytes) return -1;
2153 if (*nbytes > BLOSC_MAX_BUFFERSIZE) return -1;
2154 return 0;
2155 }
2156
2157 /* Return `typesize` and `flags` from a compressed buffer. */
blosc_cbuffer_metainfo(const void * cbuffer,size_t * typesize,int * flags)2158 void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize,
2159 int *flags)
2160 {
2161 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
2162
2163 uint8_t version = _src[0]; /* version of header */
2164
2165 if (version != BLOSC_VERSION_FORMAT) {
2166 *flags = *typesize = 0;
2167 return;
2168 }
2169
2170 /* Read the interesting values */
2171 *flags = (int)_src[2] & 7; /* first three flags */
2172 *typesize = (size_t)_src[3]; /* typesize */
2173 }
2174
2175
2176 /* Return version information from a compressed buffer. */
blosc_cbuffer_versions(const void * cbuffer,int * version,int * versionlz)2177 void blosc_cbuffer_versions(const void *cbuffer, int *version,
2178 int *versionlz)
2179 {
2180 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
2181
2182 /* Read the version info */
2183 *version = (int)_src[0]; /* blosc format version */
2184 *versionlz = (int)_src[1]; /* Lempel-Ziv compressor format version */
2185 }
2186
2187
2188 /* Return the compressor library/format used in a compressed buffer. */
blosc_cbuffer_complib(const void * cbuffer)2189 const char *blosc_cbuffer_complib(const void *cbuffer)
2190 {
2191 uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
2192 int clibcode;
2193 const char *complib;
2194
2195 /* Read the compressor format/library info */
2196 clibcode = (_src[2] & 0xe0) >> 5;
2197 complib = clibcode_to_clibname(clibcode);
2198 return complib;
2199 }
2200
2201 /* Get the internal blocksize to be used during compression. 0 means
2202 that an automatic blocksize is computed internally. */
blosc_get_blocksize(void)2203 int blosc_get_blocksize(void)
2204 {
2205 return (int)g_force_blocksize;
2206 }
2207
2208 /* Force the use of a specific blocksize. If 0, an automatic
2209 blocksize will be used (the default). */
blosc_set_blocksize(size_t size)2210 void blosc_set_blocksize(size_t size)
2211 {
2212 g_force_blocksize = (int32_t)size;
2213 }
2214
2215 /* Force the use of a specific split mode. */
blosc_set_splitmode(int mode)2216 void blosc_set_splitmode(int mode)
2217 {
2218 g_splitmode = mode;
2219 }
2220
2221 /* Child global context is invalid and pool threads no longer exist post-fork.
2222 * Discard the old, inconsistent global context and global context mutex and
2223 * mark as uninitialized. Subsequent calls through `blosc_*` interfaces will
2224 * trigger re-init of the global context.
2225 *
2226 * All pthread interfaces have undefined behavior in child handler in current
2227 * posix standards: http://pubs.opengroup.org/onlinepubs/9699919799/
2228 */
blosc_atfork_child(void)2229 void blosc_atfork_child(void) {
2230 if (!g_initlib) return;
2231
2232 g_initlib = 0;
2233
2234 my_free(global_comp_mutex);
2235 global_comp_mutex = NULL;
2236
2237 my_free(g_global_context);
2238 g_global_context = NULL;
2239
2240 }
2241
blosc_init(void)2242 void blosc_init(void)
2243 {
2244 /* Return if we are already initialized */
2245 if (g_initlib) return;
2246
2247 global_comp_mutex = (pthread_mutex_t*)my_malloc(sizeof(pthread_mutex_t));
2248 pthread_mutex_init(global_comp_mutex, NULL);
2249
2250 g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context));
2251 g_global_context->threads_started = 0;
2252
2253 #if !defined(_WIN32)
2254 /* atfork handlers are only be registered once, though multiple re-inits may
2255 * occur via blosc_destroy/blosc_init. */
2256 if (!g_atfork_registered) {
2257 g_atfork_registered = 1;
2258 pthread_atfork(NULL, NULL, &blosc_atfork_child);
2259 }
2260 #endif
2261
2262 g_initlib = 1;
2263 }
2264
blosc_destroy(void)2265 void blosc_destroy(void)
2266 {
2267 /* Return if Blosc is not initialized */
2268 if (!g_initlib) return;
2269
2270 g_initlib = 0;
2271
2272 blosc_release_threadpool(g_global_context);
2273 my_free(g_global_context);
2274 g_global_context = NULL;
2275
2276 pthread_mutex_destroy(global_comp_mutex);
2277 my_free(global_comp_mutex);
2278 global_comp_mutex = NULL;
2279 }
2280
blosc_release_threadpool(struct blosc_context * context)2281 int blosc_release_threadpool(struct blosc_context* context)
2282 {
2283 int32_t t;
2284 void* status;
2285 int rc;
2286 int rc2;
2287 (void)rc; // just to avoid 'unused-variable' warning
2288
2289 if (context->threads_started > 0)
2290 {
2291 /* Tell all existing threads to finish */
2292 context->end_threads = 1;
2293
2294 /* Sync threads */
2295 WAIT_INIT(-1, context);
2296
2297 /* Join exiting threads */
2298 for (t=0; t<context->threads_started; t++) {
2299 rc2 = pthread_join(context->threads[t], &status);
2300 if (rc2) {
2301 fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
2302 fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
2303 }
2304 }
2305
2306 /* Release mutex and condition variable objects */
2307 pthread_mutex_destroy(&context->count_mutex);
2308
2309 /* Barriers */
2310 #ifdef _POSIX_BARRIERS_MINE
2311 pthread_barrier_destroy(&context->barr_init);
2312 pthread_barrier_destroy(&context->barr_finish);
2313 #else
2314 pthread_mutex_destroy(&context->count_threads_mutex);
2315 pthread_cond_destroy(&context->count_threads_cv);
2316 #endif
2317
2318 /* Thread attributes */
2319 #if !defined(_WIN32)
2320 pthread_attr_destroy(&context->ct_attr);
2321 #endif
2322
2323 }
2324
2325 context->threads_started = 0;
2326
2327 return 0;
2328 }
2329
blosc_free_resources(void)2330 int blosc_free_resources(void)
2331 {
2332 /* Return if Blosc is not initialized */
2333 if (!g_initlib) return -1;
2334
2335 return blosc_release_threadpool(g_global_context);
2336 }
2337