1 /*********************************************************************
2 Blosc - Blocked Shuffling and Compression Library
3
4 Copyright (C) 2021 The Blosc Developers <blosc@blosc.org>
5 https://blosc.org
6 License: BSD 3-Clause (see LICENSE.txt)
7
8 See LICENSE.txt for details about copyright and rights to use.
9 **********************************************************************/
10
11
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <errno.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <assert.h>
18 #include <math.h>
19
20 #include "blosc2.h"
21 #include "blosc-private.h"
22 #include "frame.h"
23
24
25 #if defined(USING_CMAKE)
26 #include "config.h"
27 #endif /* USING_CMAKE */
28 #include "context.h"
29
30 #include "shuffle.h"
31 #include "delta.h"
32 #include "trunc-prec.h"
33 #include "blosclz.h"
34 #include "stune.h"
35 #include "config.h"
36 #include "blosc2/codecs-registry.h"
37 #include "blosc2/filters-registry.h"
38
39 #include "lz4.h"
40 #include "lz4hc.h"
41 #ifdef HAVE_IPP
42 #include <ipps.h>
43 #include <ippdc.h>
44 #endif
45 #if defined(HAVE_ZLIB_NG)
46 #include "zlib.h"
47 #elif defined(HAVE_ZLIB)
48 #include "zlib.h"
49 #endif /* HAVE_MINIZ */
50 #if defined(HAVE_ZSTD)
51 #include "zstd.h"
52 #include "zstd_errors.h"
53 // #include "cover.h" // for experimenting with fast cover training for building dicts
54 #include "zdict.h"
55 #endif /* HAVE_ZSTD */
56
57
58 #if defined(_WIN32) && !defined(__MINGW32__)
59 #include <windows.h>
60 #include <malloc.h>
61 #include <process.h>
62 #define getpid _getpid
63 #endif /* _WIN32 */
64
65 #if defined(_WIN32) && !defined(__GNUC__)
66 #include "win32/pthread.c"
67 #endif
68
69 /* Synchronization variables */
70
71 /* Global context for non-contextual API */
72 static blosc2_context* g_global_context;
73 static pthread_mutex_t global_comp_mutex;
74 static int g_compressor = BLOSC_BLOSCLZ;
75 static int g_delta = 0;
76 /* the compressor to use by default */
77 static int16_t g_nthreads = 1;
78 static int32_t g_force_blocksize = 0;
79 static int g_initlib = 0;
80 static blosc2_schunk* g_schunk = NULL; /* the pointer to super-chunk */
81
82 blosc2_codec g_codecs[256] = {0};
83 uint8_t g_ncodecs = 0;
84
85 static blosc2_filter g_filters[256] = {0};
86 static uint64_t g_nfilters = 0;
87
88 static blosc2_io_cb g_io[256] = {0};
89 static uint64_t g_nio = 0;
90
91
92 // Forward declarations
93 int init_threadpool(blosc2_context *context);
94 int release_threadpool(blosc2_context *context);
95
96 /* Macros for synchronization */
97
98 /* Wait until all threads are initialized */
99 #ifdef BLOSC_POSIX_BARRIERS
100 #define WAIT_INIT(RET_VAL, CONTEXT_PTR) \
101 rc = pthread_barrier_wait(&(CONTEXT_PTR)->barr_init); \
102 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
103 BLOSC_TRACE_ERROR("Could not wait on barrier (init): %d", rc); \
104 return((RET_VAL)); \
105 }
106 #else
107 #define WAIT_INIT(RET_VAL, CONTEXT_PTR) \
108 pthread_mutex_lock(&(CONTEXT_PTR)->count_threads_mutex); \
109 if ((CONTEXT_PTR)->count_threads < (CONTEXT_PTR)->nthreads) { \
110 (CONTEXT_PTR)->count_threads++; \
111 pthread_cond_wait(&(CONTEXT_PTR)->count_threads_cv, \
112 &(CONTEXT_PTR)->count_threads_mutex); \
113 } \
114 else { \
115 pthread_cond_broadcast(&(CONTEXT_PTR)->count_threads_cv); \
116 } \
117 pthread_mutex_unlock(&(CONTEXT_PTR)->count_threads_mutex);
118 #endif
119
120 /* Wait for all threads to finish */
121 #ifdef BLOSC_POSIX_BARRIERS
122 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \
123 rc = pthread_barrier_wait(&(CONTEXT_PTR)->barr_finish); \
124 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
125 BLOSC_TRACE_ERROR("Could not wait on barrier (finish)"); \
126 return((RET_VAL)); \
127 }
128 #else
129 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \
130 pthread_mutex_lock(&(CONTEXT_PTR)->count_threads_mutex); \
131 if ((CONTEXT_PTR)->count_threads > 0) { \
132 (CONTEXT_PTR)->count_threads--; \
133 pthread_cond_wait(&(CONTEXT_PTR)->count_threads_cv, \
134 &(CONTEXT_PTR)->count_threads_mutex); \
135 } \
136 else { \
137 pthread_cond_broadcast(&(CONTEXT_PTR)->count_threads_cv); \
138 } \
139 pthread_mutex_unlock(&(CONTEXT_PTR)->count_threads_mutex);
140 #endif
141
142
143 /* global variable to change threading backend from Blosc-managed to caller-managed */
144 static blosc_threads_callback threads_callback = 0;
145 static void *threads_callback_data = 0;
146
147 /* non-threadsafe function should be called before any other Blosc function in
148 order to change how threads are managed */
blosc_set_threads_callback(blosc_threads_callback callback,void * callback_data)149 void blosc_set_threads_callback(blosc_threads_callback callback, void *callback_data)
150 {
151 threads_callback = callback;
152 threads_callback_data = callback_data;
153 }
154
155
156 /* A function for aligned malloc that is portable */
my_malloc(size_t size)157 static uint8_t* my_malloc(size_t size) {
158 void* block = NULL;
159 int res = 0;
160
161 /* Do an alignment to 32 bytes because AVX2 is supported */
162 #if defined(_WIN32)
163 /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
164 block = (void *)_aligned_malloc(size, 32);
165 #elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
166 /* Platform does have an implementation of posix_memalign */
167 res = posix_memalign(&block, 32, size);
168 #else
169 block = malloc(size);
170 #endif /* _WIN32 */
171
172 if (block == NULL || res != 0) {
173 BLOSC_TRACE_ERROR("Error allocating memory!");
174 return NULL;
175 }
176
177 return (uint8_t*)block;
178 }
179
180
181 /* Release memory booked by my_malloc */
my_free(void * block)182 static void my_free(void* block) {
183 #if defined(_WIN32)
184 _aligned_free(block);
185 #else
186 free(block);
187 #endif /* _WIN32 */
188 }
189
190
191 /*
192 * Conversion routines between compressor and compression libraries
193 */
194
195 /* Return the library code associated with the compressor name */
compname_to_clibcode(const char * compname)196 static int compname_to_clibcode(const char* compname) {
197 if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
198 return BLOSC_BLOSCLZ_LIB;
199 if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
200 return BLOSC_LZ4_LIB;
201 if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
202 return BLOSC_LZ4_LIB;
203 if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
204 return BLOSC_ZLIB_LIB;
205 if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
206 return BLOSC_ZSTD_LIB;
207 for (int i = 0; i < g_ncodecs; ++i) {
208 if (strcmp(compname, g_codecs[i].compname) == 0)
209 return g_codecs[i].complib;
210 }
211 return BLOSC2_ERROR_NOT_FOUND;
212 }
213
214 /* Return the library name associated with the compressor code */
clibcode_to_clibname(int clibcode)215 static const char* clibcode_to_clibname(int clibcode) {
216 if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
217 if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
218 if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
219 if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
220 for (int i = 0; i < g_ncodecs; ++i) {
221 if (clibcode == g_codecs[i].complib)
222 return g_codecs[i].compname;
223 }
224 return NULL; /* should never happen */
225 }
226
227
228 /*
229 * Conversion routines between compressor names and compressor codes
230 */
231
232 /* Get the compressor name associated with the compressor code */
blosc_compcode_to_compname(int compcode,const char ** compname)233 int blosc_compcode_to_compname(int compcode, const char** compname) {
234 int code = -1; /* -1 means non-existent compressor code */
235 const char* name = NULL;
236
237 /* Map the compressor code */
238 if (compcode == BLOSC_BLOSCLZ)
239 name = BLOSC_BLOSCLZ_COMPNAME;
240 else if (compcode == BLOSC_LZ4)
241 name = BLOSC_LZ4_COMPNAME;
242 else if (compcode == BLOSC_LZ4HC)
243 name = BLOSC_LZ4HC_COMPNAME;
244 else if (compcode == BLOSC_ZLIB)
245 name = BLOSC_ZLIB_COMPNAME;
246 else if (compcode == BLOSC_ZSTD)
247 name = BLOSC_ZSTD_COMPNAME;
248 else {
249 for (int i = 0; i < g_ncodecs; ++i) {
250 if (compcode == g_codecs[i].compcode) {
251 name = g_codecs[i].compname;
252 break;
253 }
254 }
255 }
256
257 *compname = name;
258
259 /* Guess if there is support for this code */
260 if (compcode == BLOSC_BLOSCLZ)
261 code = BLOSC_BLOSCLZ;
262 else if (compcode == BLOSC_LZ4)
263 code = BLOSC_LZ4;
264 else if (compcode == BLOSC_LZ4HC)
265 code = BLOSC_LZ4HC;
266 #if defined(HAVE_ZLIB)
267 else if (compcode == BLOSC_ZLIB)
268 code = BLOSC_ZLIB;
269 #endif /* HAVE_ZLIB */
270 #if defined(HAVE_ZSTD)
271 else if (compcode == BLOSC_ZSTD)
272 code = BLOSC_ZSTD;
273 #endif /* HAVE_ZSTD */
274 else if (compcode >= BLOSC_LAST_CODEC)
275 code = compcode;
276 return code;
277 }
278
279 /* Get the compressor code for the compressor name. -1 if it is not available */
blosc_compname_to_compcode(const char * compname)280 int blosc_compname_to_compcode(const char* compname) {
281 int code = -1; /* -1 means non-existent compressor code */
282
283 if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
284 code = BLOSC_BLOSCLZ;
285 }
286 else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
287 code = BLOSC_LZ4;
288 }
289 else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
290 code = BLOSC_LZ4HC;
291 }
292 #if defined(HAVE_ZLIB)
293 else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
294 code = BLOSC_ZLIB;
295 }
296 #endif /* HAVE_ZLIB */
297 #if defined(HAVE_ZSTD)
298 else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
299 code = BLOSC_ZSTD;
300 }
301 #endif /* HAVE_ZSTD */
302 else{
303 for (int i = 0; i < g_ncodecs; ++i) {
304 if (strcmp(compname, g_codecs[i].compname) == 0) {
305 code = g_codecs[i].compcode;
306 break;
307 }
308 }
309 }
310 return code;
311 }
312
313
314 /* Convert compressor code to blosc compressor format code */
compcode_to_compformat(int compcode)315 static int compcode_to_compformat(int compcode) {
316 switch (compcode) {
317 case BLOSC_BLOSCLZ: return BLOSC_BLOSCLZ_FORMAT;
318 case BLOSC_LZ4: return BLOSC_LZ4_FORMAT;
319 case BLOSC_LZ4HC: return BLOSC_LZ4HC_FORMAT;
320
321 #if defined(HAVE_ZLIB)
322 case BLOSC_ZLIB: return BLOSC_ZLIB_FORMAT;
323 #endif /* HAVE_ZLIB */
324
325 #if defined(HAVE_ZSTD)
326 case BLOSC_ZSTD: return BLOSC_ZSTD_FORMAT;
327 break;
328 #endif /* HAVE_ZSTD */
329 default:
330 return BLOSC_UDCODEC_FORMAT;
331 }
332 return -1;
333 }
334
335
336 /* Convert compressor code to blosc compressor format version */
compcode_to_compversion(int compcode)337 static int compcode_to_compversion(int compcode) {
338 /* Write compressor format */
339 switch (compcode) {
340 case BLOSC_BLOSCLZ: return BLOSC_BLOSCLZ_VERSION_FORMAT;
341 case BLOSC_LZ4: return BLOSC_LZ4_VERSION_FORMAT;
342 case BLOSC_LZ4HC: return BLOSC_LZ4HC_VERSION_FORMAT;
343
344 #if defined(HAVE_ZLIB)
345 case BLOSC_ZLIB: return BLOSC_ZLIB_VERSION_FORMAT;
346 break;
347 #endif /* HAVE_ZLIB */
348
349 #if defined(HAVE_ZSTD)
350 case BLOSC_ZSTD: return BLOSC_ZSTD_VERSION_FORMAT;
351 break;
352 #endif /* HAVE_ZSTD */
353 default:
354 for (int i = 0; i < g_ncodecs; ++i) {
355 if (compcode == g_codecs[i].compcode) {
356 return g_codecs[i].compver;
357 }
358 }
359 }
360 return -1;
361 }
362
363
lz4_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int accel,void * hash_table)364 static int lz4_wrap_compress(const char* input, size_t input_length,
365 char* output, size_t maxout, int accel, void* hash_table) {
366 BLOSC_UNUSED_PARAM(accel);
367 int cbytes;
368 #ifdef HAVE_IPP
369 if (hash_table == NULL) {
370 return BLOSC2_ERROR_INVALID_PARAM; // the hash table should always be initialized
371 }
372 int outlen = (int)maxout;
373 int inlen = (int)input_length;
374 // I have not found any function that uses `accel` like in `LZ4_compress_fast`, but
375 // the IPP LZ4Safe call does a pretty good job on compressing well, so let's use it
376 IppStatus status = ippsEncodeLZ4Safe_8u((const Ipp8u*)input, &inlen,
377 (Ipp8u*)output, &outlen, (Ipp8u*)hash_table);
378 if (status == ippStsDstSizeLessExpected) {
379 return 0; // we cannot compress in required outlen
380 }
381 else if (status != ippStsNoErr) {
382 return BLOSC2_ERROR_FAILURE; // an unexpected error happened
383 }
384 cbytes = outlen;
385 #else
386 BLOSC_UNUSED_PARAM(hash_table);
387 accel = 1; // deactivate acceleration to match IPP behaviour
388 cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout, accel);
389 #endif
390 return cbytes;
391 }
392
393
lz4hc_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)394 static int lz4hc_wrap_compress(const char* input, size_t input_length,
395 char* output, size_t maxout, int clevel) {
396 int cbytes;
397 if (input_length > (size_t)(UINT32_C(2) << 30))
398 return BLOSC2_ERROR_2GB_LIMIT;
399 /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
400 * but levels larger than 9 do not buy much compression. */
401 cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
402 clevel);
403 return cbytes;
404 }
405
406
lz4_wrap_decompress(const char * input,size_t compressed_length,char * output,size_t maxout)407 static int lz4_wrap_decompress(const char* input, size_t compressed_length,
408 char* output, size_t maxout) {
409 int nbytes;
410 #ifdef HAVE_IPP
411 int outlen = (int)maxout;
412 int inlen = (int)compressed_length;
413 IppStatus status;
414 status = ippsDecodeLZ4_8u((const Ipp8u*)input, inlen, (Ipp8u*)output, &outlen);
415 //status = ippsDecodeLZ4Dict_8u((const Ipp8u*)input, &inlen, (Ipp8u*)output, 0, &outlen, NULL, 1 << 16);
416 nbytes = (status == ippStsNoErr) ? outlen : -outlen;
417 #else
418 nbytes = LZ4_decompress_safe(input, output, (int)compressed_length, (int)maxout);
419 #endif
420 if (nbytes != (int)maxout) {
421 return 0;
422 }
423 return (int)maxout;
424 }
425
426 #if defined(HAVE_ZLIB)
427 /* zlib is not very respectful with sharing name space with others.
428 Fortunately, its names do not collide with those already in blosc. */
zlib_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)429 static int zlib_wrap_compress(const char* input, size_t input_length,
430 char* output, size_t maxout, int clevel) {
431 int status;
432 uLongf cl = (uLongf)maxout;
433 status = compress2(
434 (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
435 if (status != Z_OK) {
436 return 0;
437 }
438 return (int)cl;
439 }
440
zlib_wrap_decompress(const char * input,size_t compressed_length,char * output,size_t maxout)441 static int zlib_wrap_decompress(const char* input, size_t compressed_length,
442 char* output, size_t maxout) {
443 int status;
444 uLongf ul = (uLongf)maxout;
445 status = uncompress(
446 (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
447 if (status != Z_OK) {
448 return 0;
449 }
450 return (int)ul;
451 }
452 #endif /* HAVE_ZLIB */
453
454
455 #if defined(HAVE_ZSTD)
zstd_wrap_compress(struct thread_context * thread_context,const char * input,size_t input_length,char * output,size_t maxout,int clevel)456 static int zstd_wrap_compress(struct thread_context* thread_context,
457 const char* input, size_t input_length,
458 char* output, size_t maxout, int clevel) {
459 size_t code;
460 blosc2_context* context = thread_context->parent_context;
461
462 clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
463 /* Make the level 8 close enough to maxCLevel */
464 if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
465
466 if (thread_context->zstd_cctx == NULL) {
467 thread_context->zstd_cctx = ZSTD_createCCtx();
468 }
469
470 if (context->use_dict) {
471 assert(context->dict_cdict != NULL);
472 code = ZSTD_compress_usingCDict(
473 thread_context->zstd_cctx, (void*)output, maxout, (void*)input,
474 input_length, context->dict_cdict);
475 } else {
476 code = ZSTD_compressCCtx(thread_context->zstd_cctx,
477 (void*)output, maxout, (void*)input, input_length, clevel);
478 }
479 if (ZSTD_isError(code) != ZSTD_error_no_error) {
480 // Do not print anything because blosc will just memcpy this buffer
481 // fprintf(stderr, "Error in ZSTD compression: '%s'. Giving up.\n",
482 // ZDICT_getErrorName(code));
483 return 0;
484 }
485 return (int)code;
486 }
487
zstd_wrap_decompress(struct thread_context * thread_context,const char * input,size_t compressed_length,char * output,size_t maxout)488 static int zstd_wrap_decompress(struct thread_context* thread_context,
489 const char* input, size_t compressed_length,
490 char* output, size_t maxout) {
491 size_t code;
492 blosc2_context* context = thread_context->parent_context;
493
494 if (thread_context->zstd_dctx == NULL) {
495 thread_context->zstd_dctx = ZSTD_createDCtx();
496 }
497
498 if (context->use_dict) {
499 assert(context->dict_ddict != NULL);
500 code = ZSTD_decompress_usingDDict(
501 thread_context->zstd_dctx, (void*)output, maxout, (void*)input,
502 compressed_length, context->dict_ddict);
503 } else {
504 code = ZSTD_decompressDCtx(thread_context->zstd_dctx,
505 (void*)output, maxout, (void*)input, compressed_length);
506 }
507 if (ZSTD_isError(code) != ZSTD_error_no_error) {
508 BLOSC_TRACE_ERROR("Error in ZSTD decompression: '%s'. Giving up.",
509 ZDICT_getErrorName(code));
510 return 0;
511 }
512 return (int)code;
513 }
514 #endif /* HAVE_ZSTD */
515
516 /* Compute acceleration for blosclz */
get_accel(const blosc2_context * context)517 static int get_accel(const blosc2_context* context) {
518 int clevel = context->clevel;
519
520 if (context->compcode == BLOSC_LZ4) {
521 /* This acceleration setting based on discussions held in:
522 * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
523 */
524 return (10 - clevel);
525 }
526 return 1;
527 }
528
529
do_nothing(int8_t filter,char cmode)530 int do_nothing(int8_t filter, char cmode) {
531 if (cmode == 'c') {
532 return (filter == BLOSC_NOFILTER);
533 } else {
534 // TRUNC_PREC do not have to be applied during decompression
535 return ((filter == BLOSC_NOFILTER) || (filter == BLOSC_TRUNC_PREC));
536 }
537 }
538
539
next_filter(const uint8_t * filters,int current_filter,char cmode)540 int next_filter(const uint8_t* filters, int current_filter, char cmode) {
541 for (int i = current_filter - 1; i >= 0; i--) {
542 if (!do_nothing(filters[i], cmode)) {
543 return filters[i];
544 }
545 }
546 return BLOSC_NOFILTER;
547 }
548
549
last_filter(const uint8_t * filters,char cmode)550 int last_filter(const uint8_t* filters, char cmode) {
551 int last_index = -1;
552 for (int i = BLOSC2_MAX_FILTERS - 1; i >= 0; i--) {
553 if (!do_nothing(filters[i], cmode)) {
554 last_index = i;
555 }
556 }
557 return last_index;
558 }
559
560
561 /* Convert filter pipeline to filter flags */
filters_to_flags(const uint8_t * filters)562 static uint8_t filters_to_flags(const uint8_t* filters) {
563 uint8_t flags = 0;
564
565 for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
566 switch (filters[i]) {
567 case BLOSC_SHUFFLE:
568 flags |= BLOSC_DOSHUFFLE;
569 break;
570 case BLOSC_BITSHUFFLE:
571 flags |= BLOSC_DOBITSHUFFLE;
572 break;
573 case BLOSC_DELTA:
574 flags |= BLOSC_DODELTA;
575 break;
576 default :
577 break;
578 }
579 }
580 return flags;
581 }
582
583
584 /* Convert filter flags to filter pipeline */
flags_to_filters(const uint8_t flags,uint8_t * filters)585 static void flags_to_filters(const uint8_t flags, uint8_t* filters) {
586 /* Initialize the filter pipeline */
587 memset(filters, 0, BLOSC2_MAX_FILTERS);
588 /* Fill the filter pipeline */
589 if (flags & BLOSC_DOSHUFFLE)
590 filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_SHUFFLE;
591 if (flags & BLOSC_DOBITSHUFFLE)
592 filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_BITSHUFFLE;
593 if (flags & BLOSC_DODELTA)
594 filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_DELTA;
595 }
596
597
598 /* Get filter flags from header flags */
get_filter_flags(const uint8_t header_flags,const int32_t typesize)599 static uint8_t get_filter_flags(const uint8_t header_flags,
600 const int32_t typesize) {
601 uint8_t flags = 0;
602
603 if ((header_flags & BLOSC_DOSHUFFLE) && (typesize > 1)) {
604 flags |= BLOSC_DOSHUFFLE;
605 }
606 if (header_flags & BLOSC_DOBITSHUFFLE) {
607 flags |= BLOSC_DOBITSHUFFLE;
608 }
609 if (header_flags & BLOSC_DODELTA) {
610 flags |= BLOSC_DODELTA;
611 }
612 if (header_flags & BLOSC_MEMCPYED) {
613 flags |= BLOSC_MEMCPYED;
614 }
615 return flags;
616 }
617
618 typedef struct blosc_header_s {
619 uint8_t version;
620 uint8_t versionlz;
621 uint8_t flags;
622 uint8_t typesize;
623 int32_t nbytes;
624 int32_t blocksize;
625 int32_t cbytes;
626 // Extended Blosc2 header
627 uint8_t filter_codes[BLOSC2_MAX_FILTERS];
628 uint8_t udcompcode;
629 uint8_t compcode_meta;
630 uint8_t filter_meta[BLOSC2_MAX_FILTERS];
631 uint8_t reserved2;
632 uint8_t blosc2_flags;
633 } blosc_header;
634
635
read_chunk_header(const uint8_t * src,int32_t srcsize,bool extended_header,blosc_header * header)636 int read_chunk_header(const uint8_t* src, int32_t srcsize, bool extended_header, blosc_header* header)
637 {
638 memset(header, 0, sizeof(blosc_header));
639
640 if (srcsize < BLOSC_MIN_HEADER_LENGTH) {
641 BLOSC_TRACE_ERROR("Not enough space to read Blosc header.");
642 return BLOSC2_ERROR_READ_BUFFER;
643 }
644
645 memcpy(header, src, BLOSC_MIN_HEADER_LENGTH);
646
647 bool little_endian = is_little_endian();
648
649 if (!little_endian) {
650 header->nbytes = bswap32_(header->nbytes);
651 header->blocksize = bswap32_(header->blocksize);
652 header->cbytes = bswap32_(header->cbytes);
653 }
654
655 if (header->version > BLOSC_VERSION_FORMAT) {
656 /* Version from future */
657 return BLOSC2_ERROR_VERSION_SUPPORT;
658 }
659 if (header->cbytes < BLOSC_MIN_HEADER_LENGTH) {
660 BLOSC_TRACE_ERROR("`cbytes` is too small to read min header.");
661 return BLOSC2_ERROR_INVALID_HEADER;
662 }
663 if (header->blocksize <= 0 || (header->nbytes > 0 && (header->blocksize > header->nbytes))) {
664 BLOSC_TRACE_ERROR("`blocksize` is zero or greater than uncompressed size");
665 return BLOSC2_ERROR_INVALID_HEADER;
666 }
667 if (header->blocksize > BLOSC2_MAXBLOCKSIZE) {
668 BLOSC_TRACE_ERROR("`blocksize` greater than maximum allowed");
669 return BLOSC2_ERROR_INVALID_HEADER;
670 }
671 if (header->typesize <= 0 || header->typesize > BLOSC_MAX_TYPESIZE) {
672 BLOSC_TRACE_ERROR("`typesize` is zero or greater than max allowed.");
673 return BLOSC2_ERROR_INVALID_HEADER;
674 }
675
676 /* Read extended header if it is wanted */
677 if ((extended_header) && (header->flags & BLOSC_DOSHUFFLE) && (header->flags & BLOSC_DOBITSHUFFLE)) {
678 if (header->cbytes < BLOSC_EXTENDED_HEADER_LENGTH) {
679 BLOSC_TRACE_ERROR("`cbytes` is too small to read extended header.");
680 return BLOSC2_ERROR_INVALID_HEADER;
681 }
682 if (srcsize < BLOSC_EXTENDED_HEADER_LENGTH) {
683 BLOSC_TRACE_ERROR("Not enough space to read Blosc extended header.");
684 return BLOSC2_ERROR_READ_BUFFER;
685 }
686
687 memcpy((uint8_t *)header + BLOSC_MIN_HEADER_LENGTH, src + BLOSC_MIN_HEADER_LENGTH,
688 BLOSC_EXTENDED_HEADER_LENGTH - BLOSC_MIN_HEADER_LENGTH);
689
690 int32_t special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
691 if (special_type != 0) {
692 if (header->nbytes % header->typesize != 0) {
693 BLOSC_TRACE_ERROR("`nbytes` is not a multiple of typesize");
694 return BLOSC2_ERROR_INVALID_HEADER;
695 }
696 if (special_type == BLOSC2_SPECIAL_VALUE) {
697 if (header->cbytes < BLOSC_EXTENDED_HEADER_LENGTH + header->typesize) {
698 BLOSC_TRACE_ERROR("`cbytes` is too small for run length encoding");
699 return BLOSC2_ERROR_READ_BUFFER;
700 }
701 }
702 }
703 // The number of filters depends on the version of the header. Blosc2 alpha series
704 // did not initialize filters to zero beyond the max supported.
705 if (header->version == BLOSC2_VERSION_FORMAT_ALPHA) {
706 header->filter_codes[5] = 0;
707 header->filter_meta[5] = 0;
708 }
709 }
710 else {
711 flags_to_filters(header->flags, header->filter_codes);
712 }
713 return 0;
714 }
715
blosc2_calculate_blocks(blosc2_context * context)716 static inline void blosc2_calculate_blocks(blosc2_context* context) {
717 /* Compute number of blocks in buffer */
718 context->nblocks = context->sourcesize / context->blocksize;
719 context->leftover = context->sourcesize % context->blocksize;
720 context->nblocks = (context->leftover > 0) ?
721 (context->nblocks + 1) : context->nblocks;
722 }
723
blosc2_initialize_context_from_header(blosc2_context * context,blosc_header * header)724 static int blosc2_initialize_context_from_header(blosc2_context* context, blosc_header* header) {
725 context->header_flags = header->flags;
726 context->typesize = header->typesize;
727 context->sourcesize = header->nbytes;
728 context->blocksize = header->blocksize;
729 context->blosc2_flags = header->blosc2_flags;
730 context->compcode = header->flags >> 5;
731 if (context->compcode == BLOSC_UDCODEC_FORMAT) {
732 context->compcode = header->udcompcode;
733 }
734 blosc2_calculate_blocks(context);
735
736 bool is_lazy = false;
737 if ((context->header_flags & BLOSC_DOSHUFFLE) &&
738 (context->header_flags & BLOSC_DOBITSHUFFLE)) {
739 /* Extended header */
740 context->header_overhead = BLOSC_EXTENDED_HEADER_LENGTH;
741
742 memcpy(context->filters, header->filter_codes, BLOSC2_MAX_FILTERS);
743 memcpy(context->filters_meta, header->filter_meta, BLOSC2_MAX_FILTERS);
744 context->compcode_meta = header->compcode_meta;
745
746 context->filter_flags = filters_to_flags(header->filter_codes);
747 context->special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
748
749 is_lazy = (context->blosc2_flags & 0x08u);
750 }
751 else {
752 context->header_overhead = BLOSC_MIN_HEADER_LENGTH;
753 context->filter_flags = get_filter_flags(context->header_flags, context->typesize);
754 flags_to_filters(context->header_flags, context->filters);
755 }
756
757 // Some checks for malformed headers
758 if (!is_lazy && header->cbytes > context->srcsize) {
759 return BLOSC2_ERROR_INVALID_HEADER;
760 }
761
762 return 0;
763 }
764
765
blosc2_intialize_header_from_context(blosc2_context * context,blosc_header * header,bool extended_header)766 static int blosc2_intialize_header_from_context(blosc2_context* context, blosc_header* header, bool extended_header) {
767 memset(header, 0, sizeof(blosc_header));
768
769 header->version = BLOSC_VERSION_FORMAT;
770 header->versionlz = compcode_to_compversion(context->compcode);
771 header->flags = context->header_flags;
772 header->typesize = (uint8_t)context->typesize;
773 header->nbytes = (int32_t)context->sourcesize;
774 header->blocksize = (int32_t)context->blocksize;
775
776 int little_endian = is_little_endian();
777 if (!little_endian) {
778 header->nbytes = bswap32_(header->nbytes);
779 header->blocksize = bswap32_(header->blocksize);
780 // cbytes written after compression
781 }
782
783 if (extended_header) {
784 /* Store filter pipeline info at the end of the header */
785 for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
786 header->filter_codes[i] = context->filters[i];
787 header->filter_meta[i] = context->filters_meta[i];
788 }
789 header->udcompcode = context->compcode;
790 header->compcode_meta = context->compcode_meta;
791
792 if (!little_endian) {
793 header->blosc2_flags |= BLOSC2_BIGENDIAN;
794 }
795 if (context->use_dict) {
796 header->blosc2_flags |= BLOSC2_USEDICT;
797 }
798 }
799
800 return 0;
801 }
802
803
pipeline_forward(struct thread_context * thread_context,const int32_t bsize,const uint8_t * src,const int32_t offset,uint8_t * dest,uint8_t * tmp,uint8_t * tmp2)804 uint8_t* pipeline_forward(struct thread_context* thread_context, const int32_t bsize,
805 const uint8_t* src, const int32_t offset,
806 uint8_t* dest, uint8_t* tmp, uint8_t* tmp2) {
807 blosc2_context* context = thread_context->parent_context;
808 uint8_t* _src = (uint8_t*)src + offset;
809 uint8_t* _tmp = tmp;
810 uint8_t* _dest = dest;
811 int32_t typesize = context->typesize;
812 uint8_t* filters = context->filters;
813 uint8_t* filters_meta = context->filters_meta;
814 bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
815
816 /* Prefilter function */
817 if (context->prefilter != NULL) {
818 // Create new prefilter parameters for this block (must be private for each thread)
819 blosc2_prefilter_params preparams;
820 memcpy(&preparams, context->preparams, sizeof(preparams));
821 preparams.in = _src;
822 preparams.out = _dest;
823 preparams.out_size = (size_t)bsize;
824 preparams.out_typesize = typesize;
825 preparams.out_offset = offset;
826 preparams.tid = thread_context->tid;
827 preparams.ttmp = thread_context->tmp;
828 preparams.ttmp_nbytes = thread_context->tmp_nbytes;
829 preparams.ctx = context;
830
831 if (context->prefilter(&preparams) != 0) {
832 BLOSC_TRACE_ERROR("Execution of prefilter function failed");
833 return NULL;
834 }
835
836 if (memcpyed) {
837 // No more filters are required
838 return _dest;
839 }
840 // Cycle buffers
841 _src = _dest;
842 _dest = _tmp;
843 _tmp = _src;
844 }
845
846 /* Process the filter pipeline */
847 for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
848 int rc = BLOSC2_ERROR_SUCCESS;
849 if (filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
850 switch (filters[i]) {
851 case BLOSC_SHUFFLE:
852 for (int j = 0; j <= filters_meta[i]; j++) {
853 shuffle(typesize, bsize, _src, _dest);
854 // Cycle filters when required
855 if (j < filters_meta[i]) {
856 _src = _dest;
857 _dest = _tmp;
858 _tmp = _src;
859 }
860 }
861 break;
862 case BLOSC_BITSHUFFLE:
863 if (bitshuffle(typesize, bsize, _src, _dest, tmp2) < 0) {
864 return NULL;
865 }
866 break;
867 case BLOSC_DELTA:
868 delta_encoder(src, offset, bsize, typesize, _src, _dest);
869 break;
870 case BLOSC_TRUNC_PREC:
871 truncate_precision(filters_meta[i], typesize, bsize, _src, _dest);
872 break;
873 default:
874 if (filters[i] != BLOSC_NOFILTER) {
875 BLOSC_TRACE_ERROR("Filter %d not handled during compression\n", filters[i]);
876 return NULL;
877 }
878 }
879 }
880 else {
881 // Look for the filters_meta in user filters and run it
882 for (int j = 0; j < g_nfilters; ++j) {
883 if (g_filters[j].id == filters[i]) {
884 if (g_filters[j].forward != NULL) {
885 blosc2_cparams cparams;
886 blosc2_ctx_get_cparams(context, &cparams);
887 rc = g_filters[j].forward(_src, _dest, bsize, filters_meta[i], &cparams);
888 } else {
889 BLOSC_TRACE_ERROR("Forward function is NULL");
890 return NULL;
891 }
892 if (rc != BLOSC2_ERROR_SUCCESS) {
893 BLOSC_TRACE_ERROR("User-defined filter %d failed during compression\n", filters[i]);
894 return NULL;
895 }
896 goto urfiltersuccess;
897 }
898 }
899 BLOSC_TRACE_ERROR("User-defined filter %d not found during compression\n", filters[i]);
900 return NULL;
901
902 urfiltersuccess:;
903
904 }
905
906 // Cycle buffers when required
907 if (filters[i] != BLOSC_NOFILTER) {
908 _src = _dest;
909 _dest = _tmp;
910 _tmp = _src;
911 }
912 }
913 return _src;
914 }
915
916
917 // Optimized version for detecting runs. It compares 8 bytes values wherever possible.
get_run(const uint8_t * ip,const uint8_t * ip_bound)918 static bool get_run(const uint8_t* ip, const uint8_t* ip_bound) {
919 uint8_t x = *ip;
920 int64_t value, value2;
921 /* Broadcast the value for every byte in a 64-bit register */
922 memset(&value, x, 8);
923 while (ip < (ip_bound - 8)) {
924 #if defined(BLOSC_STRICT_ALIGN)
925 memcpy(&value2, ip, 8);
926 #else
927 value2 = *(int64_t*)ip;
928 #endif
929 if (value != value2) {
930 // Values differ. We don't have a run.
931 return false;
932 }
933 else {
934 ip += 8;
935 }
936 }
937 /* Look into the remainder */
938 while ((ip < ip_bound) && (*ip == x)) ip++;
939 return ip == ip_bound ? true : false;
940 }
941
942
943 /* Shuffle & compress a single block */
blosc_c(struct thread_context * thread_context,int32_t bsize,int32_t leftoverblock,int32_t ntbytes,int32_t destsize,const uint8_t * src,const int32_t offset,uint8_t * dest,uint8_t * tmp,uint8_t * tmp2)944 static int blosc_c(struct thread_context* thread_context, int32_t bsize,
945 int32_t leftoverblock, int32_t ntbytes, int32_t destsize,
946 const uint8_t* src, const int32_t offset, uint8_t* dest,
947 uint8_t* tmp, uint8_t* tmp2) {
948 blosc2_context* context = thread_context->parent_context;
949 int dont_split = (context->header_flags & 0x10) >> 4;
950 int dict_training = context->use_dict && context->dict_cdict == NULL;
951 int32_t j, neblock, nstreams;
952 int32_t cbytes; /* number of compressed bytes in split */
953 int32_t ctbytes = 0; /* number of compressed bytes in block */
954 int64_t maxout;
955 int32_t typesize = context->typesize;
956 const char* compname;
957 int accel;
958 const uint8_t* _src;
959 uint8_t *_tmp = tmp, *_tmp2 = tmp2;
960 uint8_t *_tmp3 = thread_context->tmp4;
961 int last_filter_index = last_filter(context->filters, 'c');
962 bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
963
964 if (last_filter_index >= 0 || context->prefilter != NULL) {
965 /* Apply the filter pipeline just for the prefilter */
966 if (memcpyed && context->prefilter != NULL) {
967 // We only need the prefilter output
968 _src = pipeline_forward(thread_context, bsize, src, offset, dest, _tmp2, _tmp3);
969 if (_src == NULL) {
970 return BLOSC2_ERROR_FILTER_PIPELINE;
971 }
972 return bsize;
973 }
974 /* Apply regular filter pipeline */
975 _src = pipeline_forward(thread_context, bsize, src, offset, _tmp, _tmp2, _tmp3);
976 if (_src == NULL) {
977 return BLOSC2_ERROR_FILTER_PIPELINE;
978 }
979 } else {
980 _src = src + offset;
981 }
982
983 assert(context->clevel > 0);
984
985 /* Calculate acceleration for different compressors */
986 accel = get_accel(context);
987
988 /* The number of compressed data streams for this block */
989 if (!dont_split && !leftoverblock && !dict_training) {
990 nstreams = (int32_t)typesize;
991 }
992 else {
993 nstreams = 1;
994 }
995 neblock = bsize / nstreams;
996 for (j = 0; j < nstreams; j++) {
997 if (!dict_training) {
998 dest += sizeof(int32_t);
999 ntbytes += sizeof(int32_t);
1000 ctbytes += sizeof(int32_t);
1001
1002 const uint8_t *ip = (uint8_t *) _src + j * neblock;
1003 const uint8_t *ipbound = (uint8_t *) _src + (j + 1) * neblock;
1004
1005 // See whether we have a run here
1006 if (context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH && get_run(ip, ipbound)) {
1007 // A run
1008 int32_t value = _src[j * neblock];
1009 if (ntbytes > destsize) {
1010 return 0; /* Non-compressible data */
1011 }
1012 // Encode the repeated byte in the first (LSB) byte of the length of the split.
1013 _sw32(dest - 4, -value); // write the value in two's complement
1014 if (value > 0) {
1015 // Mark the encoding as a run-length (== 0 is always a 0's run)
1016 ntbytes += 1;
1017 ctbytes += 1;
1018 if (ntbytes > destsize) {
1019 return 0; /* Non-compressible data */
1020 }
1021 // Set MSB bit (sign) to 1 (not really necessary here, but for demonstration purposes)
1022 // dest[-1] |= 0x80;
1023 dest[0] = 0x1; // set run-length bit (0) in token
1024 dest += 1;
1025 }
1026 continue;
1027 }
1028 }
1029
1030 maxout = neblock;
1031 if (ntbytes + maxout > destsize) {
1032 /* avoid buffer * overrun */
1033 maxout = (int64_t)destsize - (int64_t)ntbytes;
1034 if (maxout <= 0) {
1035 return 0; /* non-compressible block */
1036 }
1037 }
1038 if (dict_training) {
1039 // We are in the build dict state, so don't compress
1040 // TODO: copy only a percentage for sampling
1041 memcpy(dest, _src + j * neblock, (unsigned int)neblock);
1042 cbytes = (int32_t)neblock;
1043 }
1044 else if (context->compcode == BLOSC_BLOSCLZ) {
1045 cbytes = blosclz_compress(context->clevel, _src + j * neblock,
1046 (int)neblock, dest, (int)maxout, context);
1047 }
1048 else if (context->compcode == BLOSC_LZ4) {
1049 void *hash_table = NULL;
1050 #ifdef HAVE_IPP
1051 hash_table = (void*)thread_context->lz4_hash_table;
1052 #endif
1053 cbytes = lz4_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1054 (char*)dest, (size_t)maxout, accel, hash_table);
1055 }
1056 else if (context->compcode == BLOSC_LZ4HC) {
1057 cbytes = lz4hc_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1058 (char*)dest, (size_t)maxout, context->clevel);
1059 }
1060 #if defined(HAVE_ZLIB)
1061 else if (context->compcode == BLOSC_ZLIB) {
1062 cbytes = zlib_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1063 (char*)dest, (size_t)maxout, context->clevel);
1064 }
1065 #endif /* HAVE_ZLIB */
1066 #if defined(HAVE_ZSTD)
1067 else if (context->compcode == BLOSC_ZSTD) {
1068 cbytes = zstd_wrap_compress(thread_context,
1069 (char*)_src + j * neblock, (size_t)neblock,
1070 (char*)dest, (size_t)maxout, context->clevel);
1071 }
1072 #endif /* HAVE_ZSTD */
1073 else if (context->compcode > BLOSC2_DEFINED_CODECS_STOP) {
1074 for (int i = 0; i < g_ncodecs; ++i) {
1075 if (g_codecs[i].compcode == context->compcode) {
1076 blosc2_cparams cparams;
1077 blosc2_ctx_get_cparams(context, &cparams);
1078 cbytes = g_codecs[i].encoder(_src + j * neblock,
1079 neblock,
1080 dest,
1081 maxout,
1082 context->compcode_meta,
1083 &cparams);
1084 goto urcodecsuccess;
1085 }
1086 }
1087 BLOSC_TRACE_ERROR("User-defined compressor codec %d not found during compression", context->compcode);
1088 return BLOSC2_ERROR_CODEC_SUPPORT;
1089 urcodecsuccess:
1090 ;
1091 } else {
1092 blosc_compcode_to_compname(context->compcode, &compname);
1093 BLOSC_TRACE_ERROR("Blosc has not been compiled with '%s' compression support."
1094 "Please use one having it.", compname);
1095 return BLOSC2_ERROR_CODEC_SUPPORT;
1096 }
1097
1098 if (cbytes > maxout) {
1099 /* Buffer overrun caused by compression (should never happen) */
1100 return BLOSC2_ERROR_WRITE_BUFFER;
1101 }
1102 if (cbytes < 0) {
1103 /* cbytes should never be negative */
1104 return BLOSC2_ERROR_DATA;
1105 }
1106 if (!dict_training) {
1107 if (cbytes == 0 || cbytes == neblock) {
1108 /* The compressor has been unable to compress data at all. */
1109 /* Before doing the copy, check that we are not running into a
1110 buffer overflow. */
1111 if ((ntbytes + neblock) > destsize) {
1112 return 0; /* Non-compressible data */
1113 }
1114 memcpy(dest, _src + j * neblock, (unsigned int)neblock);
1115 cbytes = neblock;
1116 }
1117 _sw32(dest - 4, cbytes);
1118 }
1119 dest += cbytes;
1120 ntbytes += cbytes;
1121 ctbytes += cbytes;
1122 } /* Closes j < nstreams */
1123
1124 //printf("c%d", ctbytes);
1125 return ctbytes;
1126 }
1127
1128
1129 /* Process the filter pipeline (decompression mode) */
pipeline_backward(struct thread_context * thread_context,const int32_t bsize,uint8_t * dest,const int32_t offset,uint8_t * src,uint8_t * tmp,uint8_t * tmp2,int last_filter_index,int32_t nblock)1130 int pipeline_backward(struct thread_context* thread_context, const int32_t bsize, uint8_t* dest,
1131 const int32_t offset, uint8_t* src, uint8_t* tmp,
1132 uint8_t* tmp2, int last_filter_index, int32_t nblock) {
1133 blosc2_context* context = thread_context->parent_context;
1134 int32_t typesize = context->typesize;
1135 uint8_t* filters = context->filters;
1136 uint8_t* filters_meta = context->filters_meta;
1137 blosc2_filter * urfilters = context->urfilters;
1138 uint8_t* _src = src;
1139 uint8_t* _dest = tmp;
1140 uint8_t* _tmp = tmp2;
1141 int errcode = 0;
1142
1143 for (int i = BLOSC2_MAX_FILTERS - 1; i >= 0; i--) {
1144 // Delta filter requires the whole chunk ready
1145 int last_copy_filter = (last_filter_index == i) || (next_filter(filters, i, 'd') == BLOSC_DELTA);
1146 if (last_copy_filter && context->postfilter == NULL) {
1147 _dest = dest + offset;
1148 }
1149 int rc = BLOSC2_ERROR_SUCCESS;
1150 if (filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
1151 switch (filters[i]) {
1152 case BLOSC_SHUFFLE:
1153 for (int j = 0; j <= filters_meta[i]; j++) {
1154 unshuffle(typesize, bsize, _src, _dest);
1155 // Cycle filters when required
1156 if (j < filters_meta[i]) {
1157 _src = _dest;
1158 _dest = _tmp;
1159 _tmp = _src;
1160 }
1161 // Check whether we have to copy the intermediate _dest buffer to final destination
1162 if (last_copy_filter && (filters_meta[i] % 2) == 1 && j == filters_meta[i]) {
1163 memcpy(dest + offset, _dest, (unsigned int) bsize);
1164 }
1165 }
1166 break;
1167 case BLOSC_BITSHUFFLE:
1168 if (bitunshuffle(typesize, bsize, _src, _dest, _tmp, context->src[BLOSC2_CHUNK_VERSION]) < 0) {
1169 return BLOSC2_ERROR_FILTER_PIPELINE;
1170 }
1171 break;
1172 case BLOSC_DELTA:
1173 if (context->nthreads == 1) {
1174 /* Serial mode */
1175 delta_decoder(dest, offset, bsize, typesize, _dest);
1176 } else {
1177 /* Force the thread in charge of the block 0 to go first */
1178 pthread_mutex_lock(&context->delta_mutex);
1179 if (context->dref_not_init) {
1180 if (offset != 0) {
1181 pthread_cond_wait(&context->delta_cv, &context->delta_mutex);
1182 } else {
1183 delta_decoder(dest, offset, bsize, typesize, _dest);
1184 context->dref_not_init = 0;
1185 pthread_cond_broadcast(&context->delta_cv);
1186 }
1187 }
1188 pthread_mutex_unlock(&context->delta_mutex);
1189 if (offset != 0) {
1190 delta_decoder(dest, offset, bsize, typesize, _dest);
1191 }
1192 }
1193 break;
1194 case BLOSC_TRUNC_PREC:
1195 // TRUNC_PREC filter does not need to be undone
1196 break;
1197 default:
1198 if (filters[i] != BLOSC_NOFILTER) {
1199 BLOSC_TRACE_ERROR("Filter %d not handled during decompression.",
1200 filters[i]);
1201
1202 errcode = -1;
1203 }
1204 }
1205 } else {
1206 // Look for the filters_meta in user filters and run it
1207 for (int j = 0; j < g_nfilters; ++j) {
1208 if (g_filters[j].id == filters[i]) {
1209 if (g_filters[j].backward != NULL) {
1210 blosc2_dparams dparams;
1211 blosc2_ctx_get_dparams(context, &dparams);
1212 rc = g_filters[j].backward(_src, _dest, bsize, filters_meta[i], &dparams);
1213 } else {
1214 BLOSC_TRACE_ERROR("Backward function is NULL");
1215 return BLOSC2_ERROR_FILTER_PIPELINE;
1216 }
1217 if (rc != BLOSC2_ERROR_SUCCESS) {
1218 BLOSC_TRACE_ERROR("User-defined filter %d failed during decompression.", filters[i]);
1219 return rc;
1220 }
1221 goto urfiltersuccess;
1222 }
1223 }
1224 BLOSC_TRACE_ERROR("User-defined filter %d not found during decompression.", filters[i]);
1225 return BLOSC2_ERROR_FILTER_PIPELINE;
1226 urfiltersuccess:;
1227 }
1228
1229 // Cycle buffers when required
1230 if ((filters[i] != BLOSC_NOFILTER) && (filters[i] != BLOSC_TRUNC_PREC)) {
1231 _src = _dest;
1232 _dest = _tmp;
1233 _tmp = _src;
1234 }
1235 if (last_filter_index == i) {
1236 break;
1237 }
1238 }
1239
1240 /* Postfilter function */
1241 if (context->postfilter != NULL) {
1242 // Create new postfilter parameters for this block (must be private for each thread)
1243 blosc2_postfilter_params postparams;
1244 memcpy(&postparams, context->postparams, sizeof(postparams));
1245 postparams.in = _src;
1246 postparams.out = dest + offset;
1247 postparams.size = bsize;
1248 postparams.typesize = typesize;
1249 postparams.offset = nblock * context->blocksize;
1250 postparams.tid = thread_context->tid;
1251 postparams.ttmp = thread_context->tmp;
1252 postparams.ttmp_nbytes = thread_context->tmp_nbytes;
1253 postparams.ctx = context;
1254
1255 if (context->postfilter(&postparams) != 0) {
1256 BLOSC_TRACE_ERROR("Execution of postfilter function failed");
1257 return BLOSC2_ERROR_POSTFILTER;
1258 }
1259 }
1260
1261 return errcode;
1262 }
1263
1264
set_nans(int32_t typesize,uint8_t * dest,int32_t destsize)1265 static int32_t set_nans(int32_t typesize, uint8_t* dest, int32_t destsize) {
1266 // destsize can only be a multiple of typesize
1267 if (destsize % typesize != 0) {
1268 return -1;
1269 }
1270 int32_t nitems = destsize / typesize;
1271 if (nitems == 0) {
1272 return 0;
1273 }
1274
1275 if (typesize == 4) {
1276 float* dest_ = (float*)dest;
1277 float val = nanf("");
1278 for (int i = 0; i < nitems; i++) {
1279 dest_[i] = val;
1280 }
1281 return nitems;
1282 }
1283 else if (typesize == 8) {
1284 double* dest_ = (double*)dest;
1285 double val = nan("");
1286 for (int i = 0; i < nitems; i++) {
1287 dest_[i] = val;
1288 }
1289 return nitems;
1290 }
1291
1292 BLOSC_TRACE_ERROR("Unsupported typesize for NaN");
1293 return BLOSC2_ERROR_DATA;
1294 }
1295
1296
set_values(int32_t typesize,const uint8_t * src,uint8_t * dest,int32_t destsize)1297 static int32_t set_values(int32_t typesize, const uint8_t* src, uint8_t* dest, int32_t destsize) {
1298 // destsize can only be a multiple of typesize
1299 int64_t val8;
1300 int64_t* dest8;
1301 int32_t val4;
1302 int32_t* dest4;
1303 int16_t val2;
1304 int16_t* dest2;
1305 int8_t val1;
1306 int8_t* dest1;
1307
1308 if (destsize % typesize != 0) {
1309 return -1;
1310 }
1311 int32_t nitems = destsize / typesize;
1312 if (nitems == 0) {
1313 return 0;
1314 }
1315
1316 switch (typesize) {
1317 case 8:
1318 val8 = ((int64_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1319 dest8 = (int64_t*)dest;
1320 for (int i = 0; i < nitems; i++) {
1321 dest8[i] = val8;
1322 }
1323 break;
1324 case 4:
1325 val4 = ((int32_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1326 dest4 = (int32_t*)dest;
1327 for (int i = 0; i < nitems; i++) {
1328 dest4[i] = val4;
1329 }
1330 break;
1331 case 2:
1332 val2 = ((int16_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1333 dest2 = (int16_t*)dest;
1334 for (int i = 0; i < nitems; i++) {
1335 dest2[i] = val2;
1336 }
1337 break;
1338 case 1:
1339 val1 = ((int8_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1340 dest1 = (int8_t*)dest;
1341 for (int i = 0; i < nitems; i++) {
1342 dest1[i] = val1;
1343 }
1344 break;
1345 default:
1346 for (int i = 0; i < nitems; i++) {
1347 memcpy(dest + i * typesize, src + BLOSC_EXTENDED_HEADER_LENGTH, typesize);
1348 }
1349 }
1350
1351 return nitems;
1352 }
1353
1354
1355 /* Decompress & unshuffle a single block */
blosc_d(struct thread_context * thread_context,int32_t bsize,int32_t leftoverblock,bool memcpyed,const uint8_t * src,int32_t srcsize,int32_t src_offset,int32_t nblock,uint8_t * dest,int32_t dest_offset,uint8_t * tmp,uint8_t * tmp2)1356 static int blosc_d(
1357 struct thread_context* thread_context, int32_t bsize,
1358 int32_t leftoverblock, bool memcpyed, const uint8_t* src, int32_t srcsize, int32_t src_offset,
1359 int32_t nblock, uint8_t* dest, int32_t dest_offset, uint8_t* tmp, uint8_t* tmp2) {
1360 blosc2_context* context = thread_context->parent_context;
1361 uint8_t* filters = context->filters;
1362 uint8_t *tmp3 = thread_context->tmp4;
1363 int32_t compformat = (context->header_flags & (uint8_t)0xe0) >> 5u;
1364 int dont_split = (context->header_flags & 0x10) >> 4;
1365 int32_t chunk_nbytes;
1366 int32_t chunk_cbytes;
1367 int nstreams;
1368 int32_t neblock;
1369 int32_t nbytes; /* number of decompressed bytes in split */
1370 int32_t cbytes; /* number of compressed bytes in split */
1371 int32_t ctbytes = 0; /* number of compressed bytes in block */
1372 int32_t ntbytes = 0; /* number of uncompressed bytes in block */
1373 uint8_t* _dest;
1374 int32_t typesize = context->typesize;
1375 const char* compname;
1376 int rc;
1377
1378 rc = blosc2_cbuffer_sizes(src, &chunk_nbytes, &chunk_cbytes, NULL);
1379 if (rc < 0) {
1380 return rc;
1381 }
1382
1383 if (context->block_maskout != NULL && context->block_maskout[nblock]) {
1384 // Do not decompress, but act as if we successfully decompressed everything
1385 return bsize;
1386 }
1387
1388 // In some situations (lazychunks) the context can arrive uninitialized
1389 // (but BITSHUFFLE needs it for accessing the format of the chunk)
1390 if (context->src == NULL) {
1391 context->src = src;
1392 }
1393
1394 // Chunks with special values cannot be lazy
1395 bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
1396 (context->blosc2_flags & 0x08u) && !context->special_type);
1397 if (is_lazy) {
1398 // The chunk is on disk, so just lazily load the block
1399 if (context->schunk == NULL) {
1400 BLOSC_TRACE_ERROR("Lazy chunk needs an associated super-chunk.");
1401 return BLOSC2_ERROR_INVALID_PARAM;
1402 }
1403 if (context->schunk->frame == NULL) {
1404 BLOSC_TRACE_ERROR("Lazy chunk needs an associated frame.");
1405 return BLOSC2_ERROR_INVALID_PARAM;
1406 }
1407 blosc2_frame_s* frame = (blosc2_frame_s*)context->schunk->frame;
1408 char* urlpath = frame->urlpath;
1409 int32_t trailer_len = sizeof(int32_t) + sizeof(int64_t) + context->nblocks * sizeof(int32_t);
1410 size_t trailer_offset = BLOSC_EXTENDED_HEADER_LENGTH + context->nblocks * sizeof(int32_t);
1411 int32_t nchunk;
1412 int64_t chunk_offset;
1413 // The nchunk and the offset of the current chunk are in the trailer
1414 nchunk = *(int32_t*)(src + trailer_offset);
1415 chunk_offset = *(int64_t*)(src + trailer_offset + sizeof(int32_t));
1416 // Get the csize of the nblock
1417 int32_t *block_csizes = (int32_t *)(src + trailer_offset + sizeof(int32_t) + sizeof(int64_t));
1418 int32_t block_csize = block_csizes[nblock];
1419 // Read the lazy block on disk
1420 void* fp = NULL;
1421 blosc2_io_cb *io_cb = blosc2_get_io_cb(context->schunk->storage->io->id);
1422 if (io_cb == NULL) {
1423 BLOSC_TRACE_ERROR("Error getting the input/output API");
1424 return BLOSC2_ERROR_PLUGIN_IO;
1425 }
1426
1427 if (frame->sframe) {
1428 // The chunk is not in the frame
1429 char* chunkpath = malloc(strlen(frame->urlpath) + 1 + 8 + strlen(".chunk") + 1);
1430 BLOSC_ERROR_NULL(chunkpath, BLOSC2_ERROR_MEMORY_ALLOC);
1431 sprintf(chunkpath, "%s/%08X.chunk", frame->urlpath, nchunk);
1432 fp = io_cb->open(chunkpath, "rb", context->schunk->storage->io->params);
1433 free(chunkpath);
1434 // The offset of the block is src_offset
1435 io_cb->seek(fp, src_offset, SEEK_SET);
1436 }
1437 else {
1438 fp = io_cb->open(urlpath, "rb", context->schunk->storage->io->params);
1439 // The offset of the block is src_offset
1440 io_cb->seek(fp, chunk_offset + src_offset, SEEK_SET);
1441 }
1442 // We can make use of tmp3 because it will be used after src is not needed anymore
1443 int64_t rbytes = io_cb->read(tmp3, 1, block_csize, fp);
1444 io_cb->close(fp);
1445 if ((int32_t)rbytes != block_csize) {
1446 BLOSC_TRACE_ERROR("Cannot read the (lazy) block out of the fileframe.");
1447 return BLOSC2_ERROR_READ_BUFFER;
1448 }
1449 src = tmp3;
1450 src_offset = 0;
1451 srcsize = block_csize;
1452 }
1453
1454 // If the chunk is memcpyed, we just have to copy the block to dest and return
1455 if (memcpyed) {
1456 int bsize_ = leftoverblock ? chunk_nbytes % context->blocksize : bsize;
1457 if (!context->special_type) {
1458 if (chunk_nbytes + context->header_overhead != chunk_cbytes) {
1459 return BLOSC2_ERROR_WRITE_BUFFER;
1460 }
1461 if (chunk_cbytes < context->header_overhead + (nblock * context->blocksize) + bsize_) {
1462 /* Not enough input to copy block */
1463 return BLOSC2_ERROR_READ_BUFFER;
1464 }
1465 }
1466 if (!is_lazy) {
1467 src += context->header_overhead + nblock * context->blocksize;
1468 }
1469 _dest = dest + dest_offset;
1470 if (context->postfilter != NULL) {
1471 // We are making use of a postfilter, so use a temp for destination
1472 _dest = tmp;
1473 }
1474 rc = 0;
1475 switch (context->special_type) {
1476 case BLOSC2_SPECIAL_VALUE:
1477 // All repeated values
1478 rc = set_values(context->typesize, context->src, _dest, bsize_);
1479 if (rc < 0) {
1480 BLOSC_TRACE_ERROR("set_values failed");
1481 return BLOSC2_ERROR_DATA;
1482 }
1483 break;
1484 case BLOSC2_SPECIAL_NAN:
1485 rc = set_nans(context->typesize, _dest, bsize_);
1486 if (rc < 0) {
1487 BLOSC_TRACE_ERROR("set_nans failed");
1488 return BLOSC2_ERROR_DATA;
1489 }
1490 break;
1491 case BLOSC2_SPECIAL_ZERO:
1492 memset(_dest, 0, bsize_);
1493 break;
1494 case BLOSC2_SPECIAL_UNINIT:
1495 // We do nothing here
1496 break;
1497 default:
1498 memcpy(_dest, src, bsize_);
1499 }
1500 if (context->postfilter != NULL) {
1501 // Create new postfilter parameters for this block (must be private for each thread)
1502 blosc2_postfilter_params postparams;
1503 memcpy(&postparams, context->postparams, sizeof(postparams));
1504 postparams.in = tmp;
1505 postparams.out = dest + dest_offset;
1506 postparams.size = bsize;
1507 postparams.typesize = typesize;
1508 postparams.offset = nblock * context->blocksize;
1509 postparams.tid = thread_context->tid;
1510 postparams.ttmp = thread_context->tmp;
1511 postparams.ttmp_nbytes = thread_context->tmp_nbytes;
1512 postparams.ctx = context;
1513
1514 // Execute the postfilter (the processed block will be copied to dest)
1515 if (context->postfilter(&postparams) != 0) {
1516 BLOSC_TRACE_ERROR("Execution of postfilter function failed");
1517 return BLOSC2_ERROR_POSTFILTER;
1518 }
1519 }
1520 return bsize_;
1521 }
1522
1523 if (!is_lazy && (src_offset <= 0 || src_offset >= srcsize)) {
1524 /* Invalid block src offset encountered */
1525 return BLOSC2_ERROR_DATA;
1526 }
1527
1528 src += src_offset;
1529 srcsize -= src_offset;
1530
1531 int last_filter_index = last_filter(filters, 'd');
1532
1533 if (((last_filter_index >= 0) &&
1534 (next_filter(filters, BLOSC2_MAX_FILTERS, 'd') != BLOSC_DELTA)) ||
1535 context->postfilter != NULL) {
1536 // We are making use of some filter, so use a temp for destination
1537 _dest = tmp;
1538 } else {
1539 // If no filters, or only DELTA in pipeline
1540 _dest = dest + dest_offset;
1541 }
1542
1543 /* The number of compressed data streams for this block */
1544 if (!dont_split && !leftoverblock && !context->use_dict) {
1545 // We don't want to split when in a training dict state
1546 nstreams = (int32_t)typesize;
1547 }
1548 else {
1549 nstreams = 1;
1550 }
1551
1552 neblock = bsize / nstreams;
1553 if (neblock == 0) {
1554 /* Not enough space to output bytes */
1555 return -1;
1556 }
1557 for (int j = 0; j < nstreams; j++) {
1558 if (srcsize < (signed)sizeof(int32_t)) {
1559 /* Not enough input to read compressed size */
1560 return BLOSC2_ERROR_READ_BUFFER;
1561 }
1562 srcsize -= sizeof(int32_t);
1563 cbytes = sw32_(src); /* amount of compressed bytes */
1564 if (cbytes > 0) {
1565 if (srcsize < cbytes) {
1566 /* Not enough input to read compressed bytes */
1567 return BLOSC2_ERROR_READ_BUFFER;
1568 }
1569 srcsize -= cbytes;
1570 }
1571 src += sizeof(int32_t);
1572 ctbytes += (signed)sizeof(int32_t);
1573
1574 /* Uncompress */
1575 if (cbytes == 0) {
1576 // A run of 0's
1577 memset(_dest, 0, (unsigned int)neblock);
1578 nbytes = neblock;
1579 }
1580 else if (cbytes < 0) {
1581 // A negative number means some encoding depending on the token that comes next
1582 uint8_t token;
1583
1584 if (srcsize < (signed)sizeof(uint8_t)) {
1585 // Not enough input to read token */
1586 return BLOSC2_ERROR_READ_BUFFER;
1587 }
1588 srcsize -= sizeof(uint8_t);
1589
1590 token = src[0];
1591 src += 1;
1592 ctbytes += 1;
1593
1594 if (token & 0x1) {
1595 // A run of bytes that are different than 0
1596 if (cbytes < -255) {
1597 // Runs can only encode a byte
1598 return BLOSC2_ERROR_RUN_LENGTH;
1599 }
1600 uint8_t value = -cbytes;
1601 memset(_dest, value, (unsigned int)neblock);
1602 } else {
1603 BLOSC_TRACE_ERROR("Invalid or unsupported compressed stream token value - %d", token);
1604 return BLOSC2_ERROR_RUN_LENGTH;
1605 }
1606 nbytes = neblock;
1607 cbytes = 0; // everything is encoded in the cbytes token
1608 }
1609 else if (cbytes == neblock) {
1610 memcpy(_dest, src, (unsigned int)neblock);
1611 nbytes = (int32_t)neblock;
1612 }
1613 else {
1614 if (compformat == BLOSC_BLOSCLZ_FORMAT) {
1615 nbytes = blosclz_decompress(src, cbytes, _dest, (int)neblock);
1616 }
1617 else if (compformat == BLOSC_LZ4_FORMAT) {
1618 nbytes = lz4_wrap_decompress((char*)src, (size_t)cbytes,
1619 (char*)_dest, (size_t)neblock);
1620 }
1621 #if defined(HAVE_ZLIB)
1622 else if (compformat == BLOSC_ZLIB_FORMAT) {
1623 nbytes = zlib_wrap_decompress((char*)src, (size_t)cbytes,
1624 (char*)_dest, (size_t)neblock);
1625 }
1626 #endif /* HAVE_ZLIB */
1627 #if defined(HAVE_ZSTD)
1628 else if (compformat == BLOSC_ZSTD_FORMAT) {
1629 nbytes = zstd_wrap_decompress(thread_context,
1630 (char*)src, (size_t)cbytes,
1631 (char*)_dest, (size_t)neblock);
1632 }
1633 #endif /* HAVE_ZSTD */
1634 else if (compformat == BLOSC_UDCODEC_FORMAT) {
1635 for (int i = 0; i < g_ncodecs; ++i) {
1636 if (g_codecs[i].compcode == context->compcode) {
1637 blosc2_dparams dparams;
1638 blosc2_ctx_get_dparams(context, &dparams);
1639 nbytes = g_codecs[i].decoder(src,
1640 cbytes,
1641 _dest,
1642 neblock,
1643 context->compcode_meta,
1644 &dparams);
1645 goto urcodecsuccess;
1646 }
1647 }
1648 BLOSC_TRACE_ERROR("User-defined compressor codec %d not found during decompression", context->compcode);
1649 return BLOSC2_ERROR_CODEC_SUPPORT;
1650 urcodecsuccess:
1651 ;
1652 }
1653 else {
1654 compname = clibcode_to_clibname(compformat);
1655 BLOSC_TRACE_ERROR(
1656 "Blosc has not been compiled with decompression "
1657 "support for '%s' format. "
1658 "Please recompile for adding this support.", compname);
1659 return BLOSC2_ERROR_CODEC_SUPPORT;
1660 }
1661
1662 /* Check that decompressed bytes number is correct */
1663 if (nbytes != neblock) {
1664 return BLOSC2_ERROR_DATA;
1665 }
1666
1667 }
1668 src += cbytes;
1669 ctbytes += cbytes;
1670 _dest += nbytes;
1671 ntbytes += nbytes;
1672 } /* Closes j < nstreams */
1673
1674 if (last_filter_index >= 0 || context->postfilter != NULL) {
1675 /* Apply regular filter pipeline */
1676 int errcode = pipeline_backward(thread_context, bsize, dest, dest_offset, tmp, tmp2, tmp3,
1677 last_filter_index, nblock);
1678 if (errcode < 0)
1679 return errcode;
1680 }
1681
1682 /* Return the number of uncompressed bytes */
1683 return (int)ntbytes;
1684 }
1685
1686
1687 /* Serial version for compression/decompression */
serial_blosc(struct thread_context * thread_context)1688 static int serial_blosc(struct thread_context* thread_context) {
1689 blosc2_context* context = thread_context->parent_context;
1690 int32_t j, bsize, leftoverblock;
1691 int32_t cbytes;
1692 int32_t ntbytes = (int32_t)context->output_bytes;
1693 int32_t* bstarts = context->bstarts;
1694 uint8_t* tmp = thread_context->tmp;
1695 uint8_t* tmp2 = thread_context->tmp2;
1696 int dict_training = context->use_dict && (context->dict_cdict == NULL);
1697 bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
1698 if (!context->do_compress && context->special_type) {
1699 // Fake a runlen as if its a memcpyed chunk
1700 memcpyed = true;
1701 }
1702
1703 for (j = 0; j < context->nblocks; j++) {
1704 if (context->do_compress && !memcpyed && !dict_training) {
1705 _sw32(bstarts + j, ntbytes);
1706 }
1707 bsize = context->blocksize;
1708 leftoverblock = 0;
1709 if ((j == context->nblocks - 1) && (context->leftover > 0)) {
1710 bsize = context->leftover;
1711 leftoverblock = 1;
1712 }
1713 if (context->do_compress) {
1714 if (memcpyed && !context->prefilter) {
1715 /* We want to memcpy only */
1716 memcpy(context->dest + context->header_overhead + j * context->blocksize,
1717 context->src + j * context->blocksize, (unsigned int)bsize);
1718 cbytes = (int32_t)bsize;
1719 }
1720 else {
1721 /* Regular compression */
1722 cbytes = blosc_c(thread_context, bsize, leftoverblock, ntbytes,
1723 context->destsize, context->src, j * context->blocksize,
1724 context->dest + ntbytes, tmp, tmp2);
1725 if (cbytes == 0) {
1726 ntbytes = 0; /* uncompressible data */
1727 break;
1728 }
1729 }
1730 }
1731 else {
1732 /* Regular decompression */
1733 // If memcpyed we don't have a bstarts section (because it is not needed)
1734 int32_t src_offset = memcpyed ?
1735 context->header_overhead + j * context->blocksize : sw32_(bstarts + j);
1736 cbytes = blosc_d(thread_context, bsize, leftoverblock, memcpyed,
1737 context->src, context->srcsize, src_offset, j,
1738 context->dest, j * context->blocksize, tmp, tmp2);
1739 }
1740
1741 if (cbytes < 0) {
1742 ntbytes = cbytes; /* error in blosc_c or blosc_d */
1743 break;
1744 }
1745 ntbytes += cbytes;
1746 }
1747
1748 return ntbytes;
1749 }
1750
1751 static void t_blosc_do_job(void *ctxt);
1752
1753 /* Threaded version for compression/decompression */
parallel_blosc(blosc2_context * context)1754 static int parallel_blosc(blosc2_context* context) {
1755 #ifdef BLOSC_POSIX_BARRIERS
1756 int rc;
1757 #endif
1758 /* Set sentinels */
1759 context->thread_giveup_code = 1;
1760 context->thread_nblock = -1;
1761
1762 if (threads_callback) {
1763 threads_callback(threads_callback_data, t_blosc_do_job,
1764 context->nthreads, sizeof(struct thread_context), (void*) context->thread_contexts);
1765 }
1766 else {
1767 /* Synchronization point for all threads (wait for initialization) */
1768 WAIT_INIT(-1, context);
1769
1770 /* Synchronization point for all threads (wait for finalization) */
1771 WAIT_FINISH(-1, context);
1772 }
1773
1774 if (context->thread_giveup_code <= 0) {
1775 /* Compression/decompression gave up. Return error code. */
1776 return context->thread_giveup_code;
1777 }
1778
1779 /* Return the total bytes (de-)compressed in threads */
1780 return (int)context->output_bytes;
1781 }
1782
1783 /* initialize a thread_context that has already been allocated */
init_thread_context(struct thread_context * thread_context,blosc2_context * context,int32_t tid)1784 static int init_thread_context(struct thread_context* thread_context, blosc2_context* context, int32_t tid)
1785 {
1786 int32_t ebsize;
1787
1788 thread_context->parent_context = context;
1789 thread_context->tid = tid;
1790
1791 ebsize = context->blocksize + context->typesize * (signed)sizeof(int32_t);
1792 thread_context->tmp_nbytes = (size_t)4 * ebsize;
1793 thread_context->tmp = my_malloc(thread_context->tmp_nbytes);
1794 BLOSC_ERROR_NULL(thread_context->tmp, BLOSC2_ERROR_MEMORY_ALLOC);
1795 thread_context->tmp2 = thread_context->tmp + ebsize;
1796 thread_context->tmp3 = thread_context->tmp2 + ebsize;
1797 thread_context->tmp4 = thread_context->tmp3 + ebsize;
1798 thread_context->tmp_blocksize = context->blocksize;
1799 #if defined(HAVE_ZSTD)
1800 thread_context->zstd_cctx = NULL;
1801 thread_context->zstd_dctx = NULL;
1802 #endif
1803
1804 /* Create the hash table for LZ4 in case we are using IPP */
1805 #ifdef HAVE_IPP
1806 IppStatus status;
1807 int inlen = thread_context->tmp_blocksize > 0 ? thread_context->tmp_blocksize : 1 << 16;
1808 int hash_size = 0;
1809 status = ippsEncodeLZ4HashTableGetSize_8u(&hash_size);
1810 if (status != ippStsNoErr) {
1811 BLOSC_TRACE_ERROR("Error in ippsEncodeLZ4HashTableGetSize_8u.");
1812 }
1813 Ipp8u *hash_table = ippsMalloc_8u(hash_size);
1814 status = ippsEncodeLZ4HashTableInit_8u(hash_table, inlen);
1815 if (status != ippStsNoErr) {
1816 BLOSC_TRACE_ERROR("Error in ippsEncodeLZ4HashTableInit_8u.");
1817 }
1818 thread_context->lz4_hash_table = hash_table;
1819 #endif
1820 return 0;
1821 }
1822
1823 static struct thread_context*
create_thread_context(blosc2_context * context,int32_t tid)1824 create_thread_context(blosc2_context* context, int32_t tid) {
1825 struct thread_context* thread_context;
1826 thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
1827 BLOSC_ERROR_NULL(thread_context, NULL);
1828 int rc = init_thread_context(thread_context, context, tid);
1829 if (rc < 0) {
1830 return NULL;
1831 }
1832 return thread_context;
1833 }
1834
1835 /* free members of thread_context, but not thread_context itself */
destroy_thread_context(struct thread_context * thread_context)1836 static void destroy_thread_context(struct thread_context* thread_context) {
1837 my_free(thread_context->tmp);
1838 #if defined(HAVE_ZSTD)
1839 if (thread_context->zstd_cctx != NULL) {
1840 ZSTD_freeCCtx(thread_context->zstd_cctx);
1841 }
1842 if (thread_context->zstd_dctx != NULL) {
1843 ZSTD_freeDCtx(thread_context->zstd_dctx);
1844 }
1845 #endif
1846 #ifdef HAVE_IPP
1847 if (thread_context->lz4_hash_table != NULL) {
1848 ippsFree(thread_context->lz4_hash_table);
1849 }
1850 #endif
1851 }
1852
free_thread_context(struct thread_context * thread_context)1853 void free_thread_context(struct thread_context* thread_context) {
1854 destroy_thread_context(thread_context);
1855 my_free(thread_context);
1856 }
1857
1858
check_nthreads(blosc2_context * context)1859 int check_nthreads(blosc2_context* context) {
1860 if (context->nthreads <= 0) {
1861 BLOSC_TRACE_ERROR("nthreads must be a positive integer.");
1862 return BLOSC2_ERROR_INVALID_PARAM;
1863 }
1864
1865 if (context->new_nthreads != context->nthreads) {
1866 if (context->nthreads > 1) {
1867 release_threadpool(context);
1868 }
1869 context->nthreads = context->new_nthreads;
1870 }
1871 if (context->new_nthreads > 1 && context->threads_started == 0) {
1872 init_threadpool(context);
1873 }
1874
1875 return context->nthreads;
1876 }
1877
1878 /* Do the compression or decompression of the buffer depending on the
1879 global params. */
do_job(blosc2_context * context)1880 static int do_job(blosc2_context* context) {
1881 int32_t ntbytes;
1882
1883 /* Set sentinels */
1884 context->dref_not_init = 1;
1885
1886 /* Check whether we need to restart threads */
1887 check_nthreads(context);
1888
1889 /* Run the serial version when nthreads is 1 or when the buffers are
1890 not larger than blocksize */
1891 if (context->nthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
1892 /* The context for this 'thread' has no been initialized yet */
1893 if (context->serial_context == NULL) {
1894 context->serial_context = create_thread_context(context, 0);
1895 }
1896 else if (context->blocksize != context->serial_context->tmp_blocksize) {
1897 free_thread_context(context->serial_context);
1898 context->serial_context = create_thread_context(context, 0);
1899 }
1900 BLOSC_ERROR_NULL(context->serial_context, BLOSC2_ERROR_THREAD_CREATE);
1901 ntbytes = serial_blosc(context->serial_context);
1902 }
1903 else {
1904 ntbytes = parallel_blosc(context);
1905 }
1906
1907 return ntbytes;
1908 }
1909
1910
initialize_context_compression(blosc2_context * context,const void * src,int32_t srcsize,void * dest,int32_t destsize,int clevel,uint8_t const * filters,uint8_t const * filters_meta,int32_t typesize,int compressor,int32_t blocksize,int16_t new_nthreads,int16_t nthreads,blosc2_btune * udbtune,void * btune_config,blosc2_schunk * schunk)1911 static int initialize_context_compression(
1912 blosc2_context* context, const void* src, int32_t srcsize, void* dest,
1913 int32_t destsize, int clevel, uint8_t const *filters,
1914 uint8_t const *filters_meta, int32_t typesize, int compressor,
1915 int32_t blocksize, int16_t new_nthreads, int16_t nthreads,
1916 blosc2_btune *udbtune, void *btune_config,
1917 blosc2_schunk* schunk) {
1918
1919 /* Set parameters */
1920 context->do_compress = 1;
1921 context->src = (const uint8_t*)src;
1922 context->srcsize = srcsize;
1923 context->dest = (uint8_t*)dest;
1924 context->output_bytes = 0;
1925 context->destsize = destsize;
1926 context->sourcesize = srcsize;
1927 context->typesize = (int32_t)typesize;
1928 context->filter_flags = filters_to_flags(filters);
1929 for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
1930 context->filters[i] = filters[i];
1931 context->filters_meta[i] = filters_meta[i];
1932 }
1933 context->compcode = compressor;
1934 context->nthreads = nthreads;
1935 context->new_nthreads = new_nthreads;
1936 context->end_threads = 0;
1937 context->clevel = clevel;
1938 context->schunk = schunk;
1939 context->btune = btune_config;
1940 context->udbtune = udbtune;
1941 /* Tune some compression parameters */
1942 context->blocksize = (int32_t)blocksize;
1943 if (context->btune != NULL) {
1944 context->udbtune->btune_next_cparams(context);
1945 } else {
1946 context->udbtune->btune_next_blocksize(context);
1947 }
1948
1949 char* envvar = getenv("BLOSC_WARN");
1950 int warnlvl = 0;
1951 if (envvar != NULL) {
1952 warnlvl = strtol(envvar, NULL, 10);
1953 }
1954
1955 /* Check buffer size limits */
1956 if (srcsize > BLOSC_MAX_BUFFERSIZE) {
1957 if (warnlvl > 0) {
1958 BLOSC_TRACE_ERROR("Input buffer size cannot exceed %d bytes.",
1959 BLOSC_MAX_BUFFERSIZE);
1960 }
1961 return 0;
1962 }
1963
1964 if (destsize < BLOSC_MAX_OVERHEAD) {
1965 if (warnlvl > 0) {
1966 BLOSC_TRACE_ERROR("Output buffer size should be larger than %d bytes.",
1967 BLOSC_MAX_OVERHEAD);
1968 }
1969 return 0;
1970 }
1971
1972 /* Compression level */
1973 if (clevel < 0 || clevel > 9) {
1974 /* If clevel not in 0..9, print an error */
1975 BLOSC_TRACE_ERROR("`clevel` parameter must be between 0 and 9!.");
1976 return BLOSC2_ERROR_CODEC_PARAM;
1977 }
1978
1979 /* Check typesize limits */
1980 if (context->typesize > BLOSC_MAX_TYPESIZE) {
1981 /* If typesize is too large, treat buffer as an 1-byte stream. */
1982 context->typesize = 1;
1983 }
1984
1985 blosc2_calculate_blocks(context);
1986
1987 return 1;
1988 }
1989
1990
initialize_context_decompression(blosc2_context * context,blosc_header * header,const void * src,int32_t srcsize,void * dest,int32_t destsize)1991 static int initialize_context_decompression(blosc2_context* context, blosc_header* header, const void* src,
1992 int32_t srcsize, void* dest, int32_t destsize) {
1993 int32_t bstarts_end;
1994
1995 context->do_compress = 0;
1996 context->src = (const uint8_t*)src;
1997 context->srcsize = srcsize;
1998 context->dest = (uint8_t*)dest;
1999 context->destsize = destsize;
2000 context->output_bytes = 0;
2001 context->end_threads = 0;
2002
2003 int rc = blosc2_initialize_context_from_header(context, header);
2004 if (rc < 0) {
2005 return rc;
2006 }
2007
2008 /* Check that we have enough space to decompress */
2009 if (context->sourcesize > (int32_t)context->destsize) {
2010 return BLOSC2_ERROR_WRITE_BUFFER;
2011 }
2012
2013 if (context->block_maskout != NULL && context->block_maskout_nitems != context->nblocks) {
2014 BLOSC_TRACE_ERROR("The number of items in block_maskout (%d) must match the number"
2015 " of blocks in chunk (%d).",
2016 context->block_maskout_nitems, context->nblocks);
2017 return BLOSC2_ERROR_DATA;
2018 }
2019
2020 context->special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
2021 if (context->special_type > BLOSC2_SPECIAL_LASTID) {
2022 BLOSC_TRACE_ERROR("Unknown special values ID (%d) ",
2023 context->special_type);
2024 return BLOSC2_ERROR_DATA;
2025 }
2026
2027 int memcpyed = (context->header_flags & (uint8_t) BLOSC_MEMCPYED);
2028 if (memcpyed && (header->cbytes != header->nbytes + context->header_overhead)) {
2029 BLOSC_TRACE_ERROR("Wrong header info for this memcpyed chunk");
2030 return BLOSC2_ERROR_DATA;
2031 }
2032
2033 if ((header->nbytes == 0) && (header->cbytes == context->header_overhead) &&
2034 !context->special_type) {
2035 // A compressed buffer with only a header can only contain a zero-length buffer
2036 return 0;
2037 }
2038
2039 context->bstarts = (int32_t *) (context->src + context->header_overhead);
2040 bstarts_end = context->header_overhead;
2041 if (!context->special_type && !memcpyed) {
2042 /* If chunk is not special or a memcpyed, we do have a bstarts section */
2043 bstarts_end = context->header_overhead + (context->nblocks * sizeof(int32_t));
2044 }
2045
2046 if (srcsize < bstarts_end) {
2047 BLOSC_TRACE_ERROR("`bstarts` exceeds length of source buffer.");
2048 return BLOSC2_ERROR_READ_BUFFER;
2049 }
2050 srcsize -= bstarts_end;
2051
2052 /* Read optional dictionary if flag set */
2053 if (context->blosc2_flags & BLOSC2_USEDICT) {
2054 #if defined(HAVE_ZSTD)
2055 context->use_dict = 1;
2056 if (context->dict_ddict != NULL) {
2057 // Free the existing dictionary (probably from another chunk)
2058 ZSTD_freeDDict(context->dict_ddict);
2059 }
2060 // The trained dictionary is after the bstarts block
2061 if (srcsize < (signed)sizeof(int32_t)) {
2062 BLOSC_TRACE_ERROR("Not enough space to read size of dictionary.");
2063 return BLOSC2_ERROR_READ_BUFFER;
2064 }
2065 srcsize -= sizeof(int32_t);
2066 // Read dictionary size
2067 context->dict_size = (size_t)sw32_(context->src + bstarts_end);
2068 if (context->dict_size <= 0 || context->dict_size > BLOSC2_MAXDICTSIZE) {
2069 BLOSC_TRACE_ERROR("Dictionary size is smaller than minimum or larger than maximum allowed.");
2070 return BLOSC2_ERROR_CODEC_DICT;
2071 }
2072 if (srcsize < (int32_t)context->dict_size) {
2073 BLOSC_TRACE_ERROR("Not enough space to read entire dictionary.");
2074 return BLOSC2_ERROR_READ_BUFFER;
2075 }
2076 srcsize -= context->dict_size;
2077 // Read dictionary
2078 context->dict_buffer = (void*)(context->src + bstarts_end + sizeof(int32_t));
2079 context->dict_ddict = ZSTD_createDDict(context->dict_buffer, context->dict_size);
2080 #endif // HAVE_ZSTD
2081 }
2082
2083 return 0;
2084 }
2085
write_compression_header(blosc2_context * context,bool extended_header)2086 static int write_compression_header(blosc2_context* context, bool extended_header) {
2087 blosc_header header;
2088 int dont_split;
2089 int dict_training = context->use_dict && (context->dict_cdict == NULL);
2090
2091 context->header_flags = 0;
2092
2093 if (context->clevel == 0) {
2094 /* Compression level 0 means buffer to be memcpy'ed */
2095 context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2096 }
2097 if (context->sourcesize < BLOSC_MIN_BUFFERSIZE) {
2098 /* Buffer is too small. Try memcpy'ing. */
2099 context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2100 }
2101
2102 bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2103 if (extended_header) {
2104 /* Indicate that we are building an extended header */
2105 context->header_overhead = BLOSC_EXTENDED_HEADER_LENGTH;
2106 context->header_flags |= (BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE);
2107 /* Store filter pipeline info at the end of the header */
2108 if (dict_training || memcpyed) {
2109 context->bstarts = NULL;
2110 context->output_bytes = context->header_overhead;
2111 } else {
2112 context->bstarts = (int32_t*)(context->dest + context->header_overhead);
2113 context->output_bytes = context->header_overhead + sizeof(int32_t) * context->nblocks;
2114 }
2115 } else {
2116 // Regular header
2117 context->header_overhead = BLOSC_MIN_HEADER_LENGTH;
2118 if (memcpyed) {
2119 context->bstarts = NULL;
2120 context->output_bytes = context->header_overhead;
2121 } else {
2122 context->bstarts = (int32_t *) (context->dest + context->header_overhead);
2123 context->output_bytes = context->header_overhead + sizeof(int32_t) * context->nblocks;
2124 }
2125 }
2126
2127 // when memcpyed bit is set, there is no point in dealing with others
2128 if (!memcpyed) {
2129 if (context->filter_flags & BLOSC_DOSHUFFLE) {
2130 /* Byte-shuffle is active */
2131 context->header_flags |= BLOSC_DOSHUFFLE;
2132 }
2133
2134 if (context->filter_flags & BLOSC_DOBITSHUFFLE) {
2135 /* Bit-shuffle is active */
2136 context->header_flags |= BLOSC_DOBITSHUFFLE;
2137 }
2138
2139 if (context->filter_flags & BLOSC_DODELTA) {
2140 /* Delta is active */
2141 context->header_flags |= BLOSC_DODELTA;
2142 }
2143
2144 dont_split = !split_block(context, context->typesize,
2145 context->blocksize, extended_header);
2146
2147 /* dont_split is in bit 4 */
2148 context->header_flags |= dont_split << 4;
2149 /* codec starts at bit 5 */
2150 uint8_t compformat = compcode_to_compformat(context->compcode);
2151 context->header_flags |= compformat << 5;
2152 }
2153
2154 // Create blosc header and store to dest
2155 blosc2_intialize_header_from_context(context, &header, extended_header);
2156
2157 memcpy(context->dest, &header, (extended_header) ?
2158 BLOSC_EXTENDED_HEADER_LENGTH : BLOSC_MIN_HEADER_LENGTH);
2159
2160 return 1;
2161 }
2162
2163
blosc_compress_context(blosc2_context * context)2164 int blosc_compress_context(blosc2_context* context) {
2165 int ntbytes = 0;
2166 blosc_timestamp_t last, current;
2167 bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2168
2169 blosc_set_timestamp(&last);
2170
2171 if (!memcpyed) {
2172 /* Do the actual compression */
2173 ntbytes = do_job(context);
2174 if (ntbytes < 0) {
2175 return ntbytes;
2176 }
2177 if (ntbytes == 0) {
2178 // Try out with a memcpy later on (last chance for fitting src buffer in dest).
2179 context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2180 memcpyed = true;
2181 }
2182 }
2183
2184 if (memcpyed) {
2185 if (context->sourcesize + context->header_overhead > context->destsize) {
2186 /* We are exceeding maximum output size */
2187 ntbytes = 0;
2188 }
2189 else {
2190 context->output_bytes = context->header_overhead;
2191 ntbytes = do_job(context);
2192 if (ntbytes < 0) {
2193 return ntbytes;
2194 }
2195 // Success! update the memcpy bit in header
2196 context->dest[BLOSC2_CHUNK_FLAGS] = context->header_flags;
2197 // and clear the memcpy bit in context (for next reuse)
2198 context->header_flags &= ~(uint8_t)BLOSC_MEMCPYED;
2199 }
2200 }
2201 else {
2202 // Check whether we have a run for the whole chunk
2203 int start_csizes = context->header_overhead + 4 * context->nblocks;
2204 int dont_split = (context->header_flags & 0x10) >> 4;
2205 int nstreams = context->nblocks;
2206 if (!dont_split) {
2207 // When splitting, the number of streams is computed differently
2208 if (context->leftover) {
2209 nstreams = (context->nblocks - 1) * context->typesize + 1;
2210 }
2211 else {
2212 nstreams *= context->typesize;
2213 }
2214 }
2215 if (ntbytes == start_csizes + nstreams * sizeof(int32_t)) {
2216 // The streams are all zero runs (by construction). Encode it...
2217 context->dest[BLOSC2_CHUNK_BLOSC2_FLAGS] |= BLOSC2_SPECIAL_ZERO << 4;
2218 // ...and assign the new chunk length
2219 ntbytes = context->header_overhead;
2220 }
2221 }
2222
2223 /* Set the number of compressed bytes in header */
2224 _sw32(context->dest + BLOSC2_CHUNK_CBYTES, ntbytes);
2225
2226 /* Set the number of bytes in dest buffer (might be useful for btune) */
2227 context->destsize = ntbytes;
2228
2229 if (context->btune != NULL) {
2230 blosc_set_timestamp(¤t);
2231 double ctime = blosc_elapsed_secs(last, current);
2232 context->udbtune->btune_update(context, ctime);
2233 }
2234
2235 return ntbytes;
2236 }
2237
2238
2239 /* The public secure routine for compression with context. */
blosc2_compress_ctx(blosc2_context * context,const void * src,int32_t srcsize,void * dest,int32_t destsize)2240 int blosc2_compress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2241 void* dest, int32_t destsize) {
2242 int error, cbytes;
2243
2244 if (context->do_compress != 1) {
2245 BLOSC_TRACE_ERROR("Context is not meant for compression. Giving up.");
2246 return BLOSC2_ERROR_INVALID_PARAM;
2247 }
2248
2249 error = initialize_context_compression(
2250 context, src, srcsize, dest, destsize,
2251 context->clevel, context->filters, context->filters_meta,
2252 context->typesize, context->compcode, context->blocksize,
2253 context->new_nthreads, context->nthreads,
2254 context->udbtune, context->btune, context->schunk);
2255 if (error <= 0) {
2256 return error;
2257 }
2258
2259 /* Write the extended header */
2260 error = write_compression_header(context, true);
2261 if (error < 0) {
2262 return error;
2263 }
2264
2265 cbytes = blosc_compress_context(context);
2266 if (cbytes < 0) {
2267 return cbytes;
2268 }
2269
2270 if (context->use_dict && context->dict_cdict == NULL) {
2271
2272 if (context->compcode != BLOSC_ZSTD) {
2273 const char* compname;
2274 compname = clibcode_to_clibname(context->compcode);
2275 BLOSC_TRACE_ERROR("Codec %s does not support dicts. Giving up.",
2276 compname);
2277 return BLOSC2_ERROR_CODEC_DICT;
2278 }
2279
2280 #ifdef HAVE_ZSTD
2281 // Build the dictionary out of the filters outcome and compress with it
2282 int32_t dict_maxsize = BLOSC2_MAXDICTSIZE;
2283 // Do not make the dict more than 5% larger than uncompressed buffer
2284 if (dict_maxsize > srcsize / 20) {
2285 dict_maxsize = srcsize / 20;
2286 }
2287 void* samples_buffer = context->dest + context->header_overhead;
2288 unsigned nblocks = 8; // the minimum that accepts zstd as of 1.4.0
2289 unsigned sample_fraction = 1; // 1 allows to use most of the chunk for training
2290 size_t sample_size = context->sourcesize / nblocks / sample_fraction;
2291
2292 // Populate the samples sizes for training the dictionary
2293 size_t* samples_sizes = malloc(nblocks * sizeof(void*));
2294 BLOSC_ERROR_NULL(samples_sizes, BLOSC2_ERROR_MEMORY_ALLOC);
2295 for (size_t i = 0; i < nblocks; i++) {
2296 samples_sizes[i] = sample_size;
2297 }
2298
2299 // Train from samples
2300 void* dict_buffer = malloc(dict_maxsize);
2301 BLOSC_ERROR_NULL(dict_buffer, BLOSC2_ERROR_MEMORY_ALLOC);
2302 int32_t dict_actual_size = (int32_t)ZDICT_trainFromBuffer(dict_buffer, dict_maxsize, samples_buffer, samples_sizes, nblocks);
2303
2304 // TODO: experiment with parameters of low-level fast cover algorithm
2305 // Note that this API is still unstable. See: https://github.com/facebook/zstd/issues/1599
2306 // ZDICT_fastCover_params_t fast_cover_params;
2307 // memset(&fast_cover_params, 0, sizeof(fast_cover_params));
2308 // fast_cover_params.d = nblocks;
2309 // fast_cover_params.steps = 4;
2310 // fast_cover_params.zParams.compressionLevel = context->clevel;
2311 //size_t dict_actual_size = ZDICT_optimizeTrainFromBuffer_fastCover(dict_buffer, dict_maxsize, samples_buffer, samples_sizes, nblocks, &fast_cover_params);
2312
2313 if (ZDICT_isError(dict_actual_size) != ZSTD_error_no_error) {
2314 BLOSC_TRACE_ERROR("Error in ZDICT_trainFromBuffer(): '%s'."
2315 " Giving up.", ZDICT_getErrorName(dict_actual_size));
2316 return BLOSC2_ERROR_CODEC_DICT;
2317 }
2318 assert(dict_actual_size > 0);
2319 free(samples_sizes);
2320
2321 // Update bytes counter and pointers to bstarts for the new compressed buffer
2322 context->bstarts = (int32_t*)(context->dest + context->header_overhead);
2323 context->output_bytes = context->header_overhead + sizeof(int32_t) * context->nblocks;
2324 /* Write the size of trained dict at the end of bstarts */
2325 _sw32(context->dest + context->output_bytes, (int32_t)dict_actual_size);
2326 context->output_bytes += sizeof(int32_t);
2327 /* Write the trained dict afterwards */
2328 context->dict_buffer = context->dest + context->output_bytes;
2329 memcpy(context->dict_buffer, dict_buffer, (unsigned int)dict_actual_size);
2330 context->dict_cdict = ZSTD_createCDict(dict_buffer, dict_actual_size, 1); // TODO: use get_accel()
2331 free(dict_buffer); // the dictionary is copied in the header now
2332 context->output_bytes += (int32_t)dict_actual_size;
2333 context->dict_size = dict_actual_size;
2334
2335 /* Compress with dict */
2336 cbytes = blosc_compress_context(context);
2337
2338 // Invalidate the dictionary for compressing other chunks using the same context
2339 context->dict_buffer = NULL;
2340 ZSTD_freeCDict(context->dict_cdict);
2341 context->dict_cdict = NULL;
2342 #endif // HAVE_ZSTD
2343 }
2344
2345 return cbytes;
2346 }
2347
2348
build_filters(const int doshuffle,const int delta,const size_t typesize,uint8_t * filters)2349 void build_filters(const int doshuffle, const int delta,
2350 const size_t typesize, uint8_t* filters) {
2351
2352 /* Fill the end part of the filter pipeline */
2353 if ((doshuffle == BLOSC_SHUFFLE) && (typesize > 1))
2354 filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_SHUFFLE;
2355 if (doshuffle == BLOSC_BITSHUFFLE)
2356 filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_BITSHUFFLE;
2357 if (delta)
2358 filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_DELTA;
2359 }
2360
2361 /* The public secure routine for compression. */
blosc2_compress(int clevel,int doshuffle,int32_t typesize,const void * src,int32_t srcsize,void * dest,int32_t destsize)2362 int blosc2_compress(int clevel, int doshuffle, int32_t typesize,
2363 const void* src, int32_t srcsize, void* dest, int32_t destsize) {
2364 int error;
2365 int result;
2366 char* envvar;
2367
2368 /* Check whether the library should be initialized */
2369 if (!g_initlib) blosc_init();
2370
2371 /* Check for a BLOSC_CLEVEL environment variable */
2372 envvar = getenv("BLOSC_CLEVEL");
2373 if (envvar != NULL) {
2374 long value;
2375 value = strtol(envvar, NULL, 10);
2376 if ((value != EINVAL) && (value >= 0)) {
2377 clevel = (int)value;
2378 }
2379 }
2380
2381 /* Check for a BLOSC_SHUFFLE environment variable */
2382 envvar = getenv("BLOSC_SHUFFLE");
2383 if (envvar != NULL) {
2384 if (strcmp(envvar, "NOSHUFFLE") == 0) {
2385 doshuffle = BLOSC_NOSHUFFLE;
2386 }
2387 if (strcmp(envvar, "SHUFFLE") == 0) {
2388 doshuffle = BLOSC_SHUFFLE;
2389 }
2390 if (strcmp(envvar, "BITSHUFFLE") == 0) {
2391 doshuffle = BLOSC_BITSHUFFLE;
2392 }
2393 }
2394
2395 /* Check for a BLOSC_DELTA environment variable */
2396 envvar = getenv("BLOSC_DELTA");
2397 if (envvar != NULL) {
2398 if (strcmp(envvar, "1") == 0) {
2399 blosc_set_delta(1);
2400 } else {
2401 blosc_set_delta(0);
2402 }
2403 }
2404
2405 /* Check for a BLOSC_TYPESIZE environment variable */
2406 envvar = getenv("BLOSC_TYPESIZE");
2407 if (envvar != NULL) {
2408 long value;
2409 value = strtol(envvar, NULL, 10);
2410 if ((value != EINVAL) && (value > 0)) {
2411 typesize = (size_t)value;
2412 }
2413 }
2414
2415 /* Check for a BLOSC_COMPRESSOR environment variable */
2416 envvar = getenv("BLOSC_COMPRESSOR");
2417 if (envvar != NULL) {
2418 result = blosc_set_compressor(envvar);
2419 if (result < 0) { return result; }
2420 }
2421
2422 /* Check for a BLOSC_COMPRESSOR environment variable */
2423 envvar = getenv("BLOSC_BLOCKSIZE");
2424 if (envvar != NULL) {
2425 long blocksize;
2426 blocksize = strtol(envvar, NULL, 10);
2427 if ((blocksize != EINVAL) && (blocksize > 0)) {
2428 blosc_set_blocksize((size_t)blocksize);
2429 }
2430 }
2431
2432 /* Check for a BLOSC_NTHREADS environment variable */
2433 envvar = getenv("BLOSC_NTHREADS");
2434 if (envvar != NULL) {
2435 long nthreads;
2436 nthreads = strtol(envvar, NULL, 10);
2437 if ((nthreads != EINVAL) && (nthreads > 0)) {
2438 result = blosc_set_nthreads((int)nthreads);
2439 if (result < 0) { return result; }
2440 }
2441 }
2442
2443 /* Check for a BLOSC_NOLOCK environment variable. It is important
2444 that this should be the last env var so that it can take the
2445 previous ones into account */
2446 envvar = getenv("BLOSC_NOLOCK");
2447 if (envvar != NULL) {
2448 // TODO: here is the only place that returns an extended header from
2449 // a blosc_compress() call. This should probably be fixed.
2450 const char *compname;
2451 blosc2_context *cctx;
2452 blosc2_cparams cparams = BLOSC2_CPARAMS_DEFAULTS;
2453
2454 blosc_compcode_to_compname(g_compressor, &compname);
2455 /* Create a context for compression */
2456 build_filters(doshuffle, g_delta, typesize, cparams.filters);
2457 // TODO: cparams can be shared in a multithreaded environment. do a copy!
2458 cparams.typesize = (uint8_t)typesize;
2459 cparams.compcode = (uint8_t)g_compressor;
2460 cparams.clevel = (uint8_t)clevel;
2461 cparams.nthreads = g_nthreads;
2462 cctx = blosc2_create_cctx(cparams);
2463 /* Do the actual compression */
2464 result = blosc2_compress_ctx(cctx, src, srcsize, dest, destsize);
2465 /* Release context resources */
2466 blosc2_free_ctx(cctx);
2467 return result;
2468 }
2469
2470 pthread_mutex_lock(&global_comp_mutex);
2471
2472 /* Initialize a context compression */
2473 uint8_t* filters = calloc(1, BLOSC2_MAX_FILTERS);
2474 BLOSC_ERROR_NULL(filters, BLOSC2_ERROR_MEMORY_ALLOC);
2475 uint8_t* filters_meta = calloc(1, BLOSC2_MAX_FILTERS);
2476 BLOSC_ERROR_NULL(filters_meta, BLOSC2_ERROR_MEMORY_ALLOC);
2477 build_filters(doshuffle, g_delta, typesize, filters);
2478 error = initialize_context_compression(
2479 g_global_context, src, srcsize, dest, destsize, clevel, filters,
2480 filters_meta, (int32_t)typesize, g_compressor, g_force_blocksize, g_nthreads, g_nthreads,
2481 &BTUNE_DEFAULTS, NULL, g_schunk);
2482 free(filters);
2483 free(filters_meta);
2484 if (error <= 0) {
2485 pthread_mutex_unlock(&global_comp_mutex);
2486 return error;
2487 }
2488
2489 envvar = getenv("BLOSC_BLOSC1_COMPAT");
2490 if (envvar != NULL) {
2491 /* Write chunk header without extended header (Blosc1 compatibility mode) */
2492 error = write_compression_header(g_global_context, false);
2493 }
2494 else {
2495 error = write_compression_header(g_global_context, true);
2496 }
2497 if (error < 0) {
2498 pthread_mutex_unlock(&global_comp_mutex);
2499 return error;
2500 }
2501
2502 result = blosc_compress_context(g_global_context);
2503
2504 pthread_mutex_unlock(&global_comp_mutex);
2505
2506 return result;
2507 }
2508
2509
2510 /* The public routine for compression. */
blosc_compress(int clevel,int doshuffle,size_t typesize,size_t nbytes,const void * src,void * dest,size_t destsize)2511 int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
2512 const void* src, void* dest, size_t destsize) {
2513 return blosc2_compress(clevel, doshuffle, (int32_t)typesize, src, (int32_t)nbytes, dest, (int32_t)destsize);
2514 }
2515
2516
2517
blosc_run_decompression_with_context(blosc2_context * context,const void * src,int32_t srcsize,void * dest,int32_t destsize)2518 int blosc_run_decompression_with_context(blosc2_context* context, const void* src, int32_t srcsize,
2519 void* dest, int32_t destsize) {
2520 blosc_header header;
2521 int32_t ntbytes;
2522 uint8_t* _src = (uint8_t*)src;
2523 int rc;
2524
2525 rc = read_chunk_header(src, srcsize, true, &header);
2526 if (rc < 0) {
2527 return rc;
2528 }
2529
2530 if (header.nbytes > destsize) {
2531 // Not enough space for writing into the destination
2532 return BLOSC2_ERROR_WRITE_BUFFER;
2533 }
2534
2535 rc = initialize_context_decompression(context, &header, src, srcsize, dest, destsize);
2536 if (rc < 0) {
2537 return rc;
2538 }
2539
2540 /* Do the actual decompression */
2541 ntbytes = do_job(context);
2542 if (ntbytes < 0) {
2543 return ntbytes;
2544 }
2545
2546 assert(ntbytes <= (int32_t)destsize);
2547 return ntbytes;
2548 }
2549
2550
2551 /* The public secure routine for decompression with context. */
blosc2_decompress_ctx(blosc2_context * context,const void * src,int32_t srcsize,void * dest,int32_t destsize)2552 int blosc2_decompress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2553 void* dest, int32_t destsize) {
2554 int result;
2555
2556 if (context->do_compress != 0) {
2557 BLOSC_TRACE_ERROR("Context is not meant for decompression. Giving up.");
2558 return BLOSC2_ERROR_INVALID_PARAM;
2559 }
2560
2561 result = blosc_run_decompression_with_context(context, src, srcsize, dest, destsize);
2562
2563 // Reset a possible block_maskout
2564 if (context->block_maskout != NULL) {
2565 free(context->block_maskout);
2566 context->block_maskout = NULL;
2567 }
2568 context->block_maskout_nitems = 0;
2569
2570 return result;
2571 }
2572
2573
2574 /* The public secure routine for decompression. */
blosc2_decompress(const void * src,int32_t srcsize,void * dest,int32_t destsize)2575 int blosc2_decompress(const void* src, int32_t srcsize, void* dest, int32_t destsize) {
2576 int result;
2577 char* envvar;
2578 long nthreads;
2579 blosc2_context *dctx;
2580 blosc2_dparams dparams = BLOSC2_DPARAMS_DEFAULTS;
2581
2582 /* Check whether the library should be initialized */
2583 if (!g_initlib) blosc_init();
2584
2585 /* Check for a BLOSC_NTHREADS environment variable */
2586 envvar = getenv("BLOSC_NTHREADS");
2587 if (envvar != NULL) {
2588 nthreads = strtol(envvar, NULL, 10);
2589 if ((nthreads != EINVAL) && (nthreads > 0)) {
2590 result = blosc_set_nthreads((int)nthreads);
2591 if (result < 0) { return result; }
2592 }
2593 }
2594
2595 /* Check for a BLOSC_NOLOCK environment variable. It is important
2596 that this should be the last env var so that it can take the
2597 previous ones into account */
2598 envvar = getenv("BLOSC_NOLOCK");
2599 if (envvar != NULL) {
2600 dparams.nthreads = g_nthreads;
2601 dctx = blosc2_create_dctx(dparams);
2602 result = blosc2_decompress_ctx(dctx, src, srcsize, dest, destsize);
2603 blosc2_free_ctx(dctx);
2604 return result;
2605 }
2606
2607 pthread_mutex_lock(&global_comp_mutex);
2608
2609 result = blosc_run_decompression_with_context(
2610 g_global_context, src, srcsize, dest, destsize);
2611
2612 pthread_mutex_unlock(&global_comp_mutex);
2613
2614 return result;
2615 }
2616
2617
2618 /* The public routine for decompression. */
blosc_decompress(const void * src,void * dest,size_t destsize)2619 int blosc_decompress(const void* src, void* dest, size_t destsize) {
2620 return blosc2_decompress(src, INT32_MAX, dest, (int32_t)destsize);
2621 }
2622
2623
2624 /* Specific routine optimized for decompression a small number of
2625 items out of a compressed chunk. This does not use threads because
2626 it would affect negatively to performance. */
_blosc_getitem(blosc2_context * context,blosc_header * header,const void * src,int32_t srcsize,int start,int nitems,void * dest,int32_t destsize)2627 int _blosc_getitem(blosc2_context* context, blosc_header* header, const void* src, int32_t srcsize,
2628 int start, int nitems, void* dest, int32_t destsize) {
2629 uint8_t* _src = (uint8_t*)(src); /* current pos for source buffer */
2630 uint8_t* _dest = (uint8_t*)(dest);
2631 int32_t ntbytes = 0; /* the number of uncompressed bytes */
2632 int32_t bsize, bsize2, ebsize, leftoverblock;
2633 int32_t cbytes;
2634 int32_t startb, stopb;
2635 int32_t stop = start + nitems;
2636 int j, rc;
2637
2638 if (nitems == 0) {
2639 // We have nothing to do
2640 return 0;
2641 }
2642 if (nitems * header->typesize > destsize) {
2643 BLOSC_TRACE_ERROR("`nitems`*`typesize` out of dest bounds.");
2644 return BLOSC2_ERROR_WRITE_BUFFER;
2645 }
2646
2647 context->bstarts = (int32_t*)(_src + context->header_overhead);
2648
2649 /* Check region boundaries */
2650 if ((start < 0) || (start * header->typesize > header->nbytes)) {
2651 BLOSC_TRACE_ERROR("`start` out of bounds.");
2652 return BLOSC2_ERROR_INVALID_PARAM;
2653 }
2654
2655 if ((stop < 0) || (stop * header->typesize > header->nbytes)) {
2656 BLOSC_TRACE_ERROR("`start`+`nitems` out of bounds.");
2657 return BLOSC2_ERROR_INVALID_PARAM;
2658 }
2659
2660 if (!context->special_type &&
2661 (_src + srcsize < (uint8_t *)(context->bstarts + context->nblocks))) {
2662 BLOSC_TRACE_ERROR("`bstarts` out of bounds.");
2663 return BLOSC2_ERROR_READ_BUFFER;
2664 }
2665
2666 bool memcpyed = header->flags & (uint8_t)BLOSC_MEMCPYED;
2667 if (context->special_type) {
2668 // Fake a runlen as if its a memcpyed chunk
2669 memcpyed = true;
2670 }
2671
2672 bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
2673 (context->blosc2_flags & 0x08u) && !context->special_type);
2674 if (memcpyed && !is_lazy && !context->postfilter) {
2675 // Short-circuit for (non-lazy) memcpyed or special values
2676 ntbytes = nitems * header->typesize;
2677 switch (context->special_type) {
2678 case BLOSC2_SPECIAL_VALUE:
2679 // All repeated values
2680 rc = set_values(context->typesize, _src, _dest, ntbytes);
2681 if (rc < 0) {
2682 BLOSC_TRACE_ERROR("set_values failed");
2683 return BLOSC2_ERROR_DATA;
2684 }
2685 break;
2686 case BLOSC2_SPECIAL_NAN:
2687 rc = set_nans(context->typesize, _dest, ntbytes);
2688 if (rc < 0) {
2689 BLOSC_TRACE_ERROR("set_nans failed");
2690 return BLOSC2_ERROR_DATA;
2691 }
2692 break;
2693 case BLOSC2_SPECIAL_ZERO:
2694 memset(_dest, 0, ntbytes);
2695 break;
2696 case BLOSC2_SPECIAL_UNINIT:
2697 // We do nothing here
2698 break;
2699 case BLOSC2_NO_SPECIAL:
2700 _src += context->header_overhead + start * context->typesize;
2701 memcpy(_dest, _src, ntbytes);
2702 break;
2703 default:
2704 BLOSC_TRACE_ERROR("Unhandled special value case");
2705 return -1;
2706 }
2707 return ntbytes;
2708 }
2709
2710 ebsize = header->blocksize + header->typesize * (signed)sizeof(int32_t);
2711 struct thread_context* scontext = context->serial_context;
2712 /* Resize the temporaries in serial context if needed */
2713 if (header->blocksize > scontext->tmp_blocksize) {
2714 my_free(scontext->tmp);
2715 scontext->tmp_nbytes = (size_t)4 * ebsize;
2716 scontext->tmp = my_malloc(scontext->tmp_nbytes);
2717 BLOSC_ERROR_NULL(scontext->tmp, BLOSC2_ERROR_MEMORY_ALLOC);
2718 scontext->tmp2 = scontext->tmp + ebsize;
2719 scontext->tmp3 = scontext->tmp2 + ebsize;
2720 scontext->tmp4 = scontext->tmp3 + ebsize;
2721 scontext->tmp_blocksize = (int32_t)header->blocksize;
2722 }
2723
2724 for (j = 0; j < context->nblocks; j++) {
2725 bsize = header->blocksize;
2726 leftoverblock = 0;
2727 if ((j == context->nblocks - 1) && (context->leftover > 0)) {
2728 bsize = context->leftover;
2729 leftoverblock = 1;
2730 }
2731
2732 /* Compute start & stop for each block */
2733 startb = start * header->typesize - j * header->blocksize;
2734 stopb = stop * header->typesize - j * header->blocksize;
2735 if (stopb <= 0) {
2736 // We can exit as soon as this block is beyond stop
2737 break;
2738 }
2739 if (startb >= header->blocksize) {
2740 continue;
2741 }
2742 if (startb < 0) {
2743 startb = 0;
2744 }
2745 if (stopb > header->blocksize) {
2746 stopb = header->blocksize;
2747 }
2748 bsize2 = stopb - startb;
2749
2750 /* Do the actual data copy */
2751 // Regular decompression. Put results in tmp2.
2752 // If the block is aligned and the worst case fits in destination, let's avoid a copy
2753 bool get_single_block = ((startb == 0) && (bsize == nitems * header->typesize));
2754 uint8_t* tmp2 = get_single_block ? dest : scontext->tmp2;
2755
2756 // If memcpyed we don't have a bstarts section (because it is not needed)
2757 int32_t src_offset = memcpyed ?
2758 context->header_overhead + j * bsize : sw32_(context->bstarts + j);
2759
2760 int cbytes = blosc_d(context->serial_context, bsize, leftoverblock, memcpyed,
2761 src, srcsize, src_offset, j,
2762 tmp2, 0, scontext->tmp, scontext->tmp3);
2763 if (cbytes < 0) {
2764 ntbytes = cbytes;
2765 break;
2766 }
2767 if (!get_single_block) {
2768 /* Copy to destination */
2769 memcpy((uint8_t *) dest + ntbytes, tmp2 + startb, (unsigned int) bsize2);
2770 }
2771 ntbytes += bsize2;
2772 }
2773
2774 return ntbytes;
2775 }
2776
blosc2_getitem(const void * src,int32_t srcsize,int start,int nitems,void * dest,int32_t destsize)2777 int blosc2_getitem(const void* src, int32_t srcsize, int start, int nitems, void* dest, int32_t destsize) {
2778 blosc2_context context;
2779 int result;
2780
2781 /* Minimally populate the context */
2782 memset(&context, 0, sizeof(blosc2_context));
2783
2784 context.schunk = g_schunk;
2785 context.nthreads = 1; // force a serial decompression; fixes #95
2786
2787 /* Call the actual getitem function */
2788 result = blosc2_getitem_ctx(&context, src, srcsize, start, nitems, dest, destsize);
2789
2790 /* Release resources */
2791 if (context.serial_context != NULL) {
2792 free_thread_context(context.serial_context);
2793 }
2794 return result;
2795 }
2796
2797 /* Specific routine optimized for decompression a small number of
2798 items out of a compressed chunk. Public non-contextual API. */
blosc_getitem(const void * src,int start,int nitems,void * dest)2799 int blosc_getitem(const void* src, int start, int nitems, void* dest) {
2800 return blosc2_getitem(src, INT32_MAX, start, nitems, dest, INT32_MAX);
2801 }
2802
blosc2_getitem_ctx(blosc2_context * context,const void * src,int32_t srcsize,int start,int nitems,void * dest,int32_t destsize)2803 int blosc2_getitem_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2804 int start, int nitems, void* dest, int32_t destsize) {
2805 blosc_header header;
2806 int result;
2807
2808 /* Minimally populate the context */
2809 result = read_chunk_header((uint8_t *) src, srcsize, true, &header);
2810 if (result < 0) {
2811 return result;
2812 }
2813
2814 context->src = src;
2815 context->srcsize = srcsize;
2816 context->dest = dest;
2817 context->destsize = destsize;
2818
2819 result = blosc2_initialize_context_from_header(context, &header);
2820 if (result < 0) {
2821 return result;
2822 }
2823
2824 if (context->serial_context == NULL) {
2825 context->serial_context = create_thread_context(context, 0);
2826 }
2827 BLOSC_ERROR_NULL(context->serial_context, BLOSC2_ERROR_THREAD_CREATE);
2828 /* Call the actual getitem function */
2829 result = _blosc_getitem(context, &header, src, srcsize, start, nitems, dest, destsize);
2830
2831 return result;
2832 }
2833
2834 /* execute single compression/decompression job for a single thread_context */
t_blosc_do_job(void * ctxt)2835 static void t_blosc_do_job(void *ctxt)
2836 {
2837 struct thread_context* thcontext = (struct thread_context*)ctxt;
2838 blosc2_context* context = thcontext->parent_context;
2839 int32_t cbytes;
2840 int32_t ntdest;
2841 int32_t tblocks; /* number of blocks per thread */
2842 int32_t tblock; /* limit block on a thread */
2843 int32_t nblock_; /* private copy of nblock */
2844 int32_t bsize;
2845 int32_t leftoverblock;
2846 /* Parameters for threads */
2847 int32_t blocksize;
2848 int32_t ebsize;
2849 int32_t srcsize;
2850 bool compress = context->do_compress != 0;
2851 int32_t maxbytes;
2852 int32_t nblocks;
2853 int32_t leftover;
2854 int32_t leftover2;
2855 int32_t* bstarts;
2856 const uint8_t* src;
2857 uint8_t* dest;
2858 uint8_t* tmp;
2859 uint8_t* tmp2;
2860 uint8_t* tmp3;
2861
2862 /* Get parameters for this thread before entering the main loop */
2863 blocksize = context->blocksize;
2864 ebsize = blocksize + context->typesize * sizeof(int32_t);
2865 maxbytes = context->destsize;
2866 nblocks = context->nblocks;
2867 leftover = context->leftover;
2868 bstarts = context->bstarts;
2869 src = context->src;
2870 srcsize = context->srcsize;
2871 dest = context->dest;
2872
2873 /* Resize the temporaries if needed */
2874 if (blocksize > thcontext->tmp_blocksize) {
2875 my_free(thcontext->tmp);
2876 thcontext->tmp_nbytes = (size_t) 4 * ebsize;
2877 thcontext->tmp = my_malloc(thcontext->tmp_nbytes);
2878 thcontext->tmp2 = thcontext->tmp + ebsize;
2879 thcontext->tmp3 = thcontext->tmp2 + ebsize;
2880 thcontext->tmp4 = thcontext->tmp3 + ebsize;
2881 thcontext->tmp_blocksize = blocksize;
2882 }
2883
2884 tmp = thcontext->tmp;
2885 tmp2 = thcontext->tmp2;
2886 tmp3 = thcontext->tmp3;
2887
2888 // Determine whether we can do a static distribution of workload among different threads
2889 bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2890 if (!context->do_compress && context->special_type) {
2891 // Fake a runlen as if its a memcpyed chunk
2892 memcpyed = true;
2893 }
2894
2895 bool static_schedule = (!compress || memcpyed) && context->block_maskout == NULL;
2896 if (static_schedule) {
2897 /* Blocks per thread */
2898 tblocks = nblocks / context->nthreads;
2899 leftover2 = nblocks % context->nthreads;
2900 tblocks = (leftover2 > 0) ? tblocks + 1 : tblocks;
2901 nblock_ = thcontext->tid * tblocks;
2902 tblock = nblock_ + tblocks;
2903 if (tblock > nblocks) {
2904 tblock = nblocks;
2905 }
2906 }
2907 else {
2908 // Use dynamic schedule via a queue. Get the next block.
2909 pthread_mutex_lock(&context->count_mutex);
2910 context->thread_nblock++;
2911 nblock_ = context->thread_nblock;
2912 pthread_mutex_unlock(&context->count_mutex);
2913 tblock = nblocks;
2914 }
2915
2916 /* Loop over blocks */
2917 leftoverblock = 0;
2918 while ((nblock_ < tblock) && (context->thread_giveup_code > 0)) {
2919 bsize = blocksize;
2920 if (nblock_ == (nblocks - 1) && (leftover > 0)) {
2921 bsize = leftover;
2922 leftoverblock = 1;
2923 }
2924 if (compress) {
2925 if (memcpyed) {
2926 if (!context->prefilter) {
2927 /* We want to memcpy only */
2928 memcpy(dest + context->header_overhead + nblock_ * blocksize,
2929 src + nblock_ * blocksize, (unsigned int) bsize);
2930 cbytes = (int32_t) bsize;
2931 }
2932 else {
2933 /* Only the prefilter has to be executed, and this is done in blosc_c().
2934 * However, no further actions are needed, so we can put the result
2935 * directly in dest. */
2936 cbytes = blosc_c(thcontext, bsize, leftoverblock, 0,
2937 ebsize, src, nblock_ * blocksize,
2938 dest + context->header_overhead + nblock_ * blocksize,
2939 tmp, tmp3);
2940 }
2941 }
2942 else {
2943 /* Regular compression */
2944 cbytes = blosc_c(thcontext, bsize, leftoverblock, 0,
2945 ebsize, src, nblock_ * blocksize, tmp2, tmp, tmp3);
2946 }
2947 }
2948 else {
2949 /* Regular decompression */
2950 if (context->special_type == BLOSC2_NO_SPECIAL && !memcpyed &&
2951 (srcsize < (int32_t)(context->header_overhead + (sizeof(int32_t) * nblocks)))) {
2952 /* Not enough input to read all `bstarts` */
2953 cbytes = -1;
2954 }
2955 else {
2956 // If memcpyed we don't have a bstarts section (because it is not needed)
2957 int32_t src_offset = memcpyed ?
2958 context->header_overhead + nblock_ * blocksize : sw32_(bstarts + nblock_);
2959 cbytes = blosc_d(thcontext, bsize, leftoverblock, memcpyed,
2960 src, srcsize, src_offset, nblock_,
2961 dest, nblock_ * blocksize, tmp, tmp2);
2962 }
2963 }
2964
2965 /* Check whether current thread has to giveup */
2966 if (context->thread_giveup_code <= 0) {
2967 break;
2968 }
2969
2970 /* Check results for the compressed/decompressed block */
2971 if (cbytes < 0) { /* compr/decompr failure */
2972 /* Set giveup_code error */
2973 pthread_mutex_lock(&context->count_mutex);
2974 context->thread_giveup_code = cbytes;
2975 pthread_mutex_unlock(&context->count_mutex);
2976 break;
2977 }
2978
2979 if (compress && !memcpyed) {
2980 /* Start critical section */
2981 pthread_mutex_lock(&context->count_mutex);
2982 ntdest = context->output_bytes;
2983 // Note: do not use a typical local dict_training variable here
2984 // because it is probably cached from previous calls if the number of
2985 // threads does not change (the usual thing).
2986 if (!(context->use_dict && context->dict_cdict == NULL)) {
2987 _sw32(bstarts + nblock_, (int32_t) ntdest);
2988 }
2989
2990 if ((cbytes == 0) || (ntdest + cbytes > maxbytes)) {
2991 context->thread_giveup_code = 0; /* uncompressible buf */
2992 pthread_mutex_unlock(&context->count_mutex);
2993 break;
2994 }
2995 context->thread_nblock++;
2996 nblock_ = context->thread_nblock;
2997 context->output_bytes += cbytes;
2998 pthread_mutex_unlock(&context->count_mutex);
2999 /* End of critical section */
3000
3001 /* Copy the compressed buffer to destination */
3002 memcpy(dest + ntdest, tmp2, (unsigned int) cbytes);
3003 }
3004 else if (static_schedule) {
3005 nblock_++;
3006 }
3007 else {
3008 pthread_mutex_lock(&context->count_mutex);
3009 context->thread_nblock++;
3010 nblock_ = context->thread_nblock;
3011 context->output_bytes += cbytes;
3012 pthread_mutex_unlock(&context->count_mutex);
3013 }
3014
3015 } /* closes while (nblock_) */
3016
3017 if (static_schedule) {
3018 context->output_bytes = context->sourcesize;
3019 if (compress) {
3020 context->output_bytes += context->header_overhead;
3021 }
3022 }
3023
3024 }
3025
3026 /* Decompress & unshuffle several blocks in a single thread */
t_blosc(void * ctxt)3027 static void* t_blosc(void* ctxt) {
3028 struct thread_context* thcontext = (struct thread_context*)ctxt;
3029 blosc2_context* context = thcontext->parent_context;
3030 #ifdef BLOSC_POSIX_BARRIERS
3031 int rc;
3032 #endif
3033
3034 while (1) {
3035 /* Synchronization point for all threads (wait for initialization) */
3036 WAIT_INIT(NULL, context);
3037
3038 if (context->end_threads) {
3039 break;
3040 }
3041
3042 t_blosc_do_job(ctxt);
3043
3044 /* Meeting point for all threads (wait for finalization) */
3045 WAIT_FINISH(NULL, context);
3046 }
3047
3048 /* Cleanup our working space and context */
3049 free_thread_context(thcontext);
3050
3051 return (NULL);
3052 }
3053
3054
init_threadpool(blosc2_context * context)3055 int init_threadpool(blosc2_context *context) {
3056 int32_t tid;
3057 int rc2;
3058
3059 /* Initialize mutex and condition variable objects */
3060 pthread_mutex_init(&context->count_mutex, NULL);
3061 pthread_mutex_init(&context->delta_mutex, NULL);
3062 pthread_cond_init(&context->delta_cv, NULL);
3063
3064 /* Set context thread sentinels */
3065 context->thread_giveup_code = 1;
3066 context->thread_nblock = -1;
3067
3068 /* Barrier initialization */
3069 #ifdef BLOSC_POSIX_BARRIERS
3070 pthread_barrier_init(&context->barr_init, NULL, context->nthreads + 1);
3071 pthread_barrier_init(&context->barr_finish, NULL, context->nthreads + 1);
3072 #else
3073 pthread_mutex_init(&context->count_threads_mutex, NULL);
3074 pthread_cond_init(&context->count_threads_cv, NULL);
3075 context->count_threads = 0; /* Reset threads counter */
3076 #endif
3077
3078 if (threads_callback) {
3079 /* Create thread contexts to store data for callback threads */
3080 context->thread_contexts = (struct thread_context *)my_malloc(
3081 context->nthreads * sizeof(struct thread_context));
3082 BLOSC_ERROR_NULL(context->thread_contexts, BLOSC2_ERROR_MEMORY_ALLOC);
3083 for (tid = 0; tid < context->nthreads; tid++)
3084 init_thread_context(context->thread_contexts + tid, context, tid);
3085 }
3086 else {
3087 #if !defined(_WIN32)
3088 /* Initialize and set thread detached attribute */
3089 pthread_attr_init(&context->ct_attr);
3090 pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
3091 #endif
3092
3093 /* Make space for thread handlers */
3094 context->threads = (pthread_t*)my_malloc(
3095 context->nthreads * sizeof(pthread_t));
3096 BLOSC_ERROR_NULL(context->threads, BLOSC2_ERROR_MEMORY_ALLOC);
3097 /* Finally, create the threads */
3098 for (tid = 0; tid < context->nthreads; tid++) {
3099 /* Create a thread context (will destroy when finished) */
3100 struct thread_context *thread_context = create_thread_context(context, tid);
3101 BLOSC_ERROR_NULL(thread_context, BLOSC2_ERROR_THREAD_CREATE);
3102 #if !defined(_WIN32)
3103 rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc,
3104 (void*)thread_context);
3105 #else
3106 rc2 = pthread_create(&context->threads[tid], NULL, t_blosc,
3107 (void *)thread_context);
3108 #endif
3109 if (rc2) {
3110 BLOSC_TRACE_ERROR("Return code from pthread_create() is %d.\n"
3111 "\tError detail: %s\n", rc2, strerror(rc2));
3112 return BLOSC2_ERROR_THREAD_CREATE;
3113 }
3114 }
3115 }
3116
3117 /* We have now started/initialized the threads */
3118 context->threads_started = context->nthreads;
3119 context->new_nthreads = context->nthreads;
3120
3121 return 0;
3122 }
3123
blosc_get_nthreads(void)3124 int16_t blosc_get_nthreads(void)
3125 {
3126 return g_nthreads;
3127 }
3128
blosc_set_nthreads(int16_t nthreads_new)3129 int16_t blosc_set_nthreads(int16_t nthreads_new) {
3130 int16_t ret = g_nthreads; /* the previous number of threads */
3131
3132 /* Check whether the library should be initialized */
3133 if (!g_initlib) blosc_init();
3134
3135 if (nthreads_new != ret) {
3136 g_nthreads = nthreads_new;
3137 g_global_context->new_nthreads = nthreads_new;
3138 check_nthreads(g_global_context);
3139 }
3140
3141 return ret;
3142 }
3143
3144
blosc_get_compressor(void)3145 const char* blosc_get_compressor(void)
3146 {
3147 const char* compname;
3148 blosc_compcode_to_compname(g_compressor, &compname);
3149
3150 return compname;
3151 }
3152
blosc_set_compressor(const char * compname)3153 int blosc_set_compressor(const char* compname) {
3154 int code = blosc_compname_to_compcode(compname);
3155 if (code >= BLOSC_LAST_CODEC) {
3156 BLOSC_TRACE_ERROR("User defined codecs cannot be set here. Use Blosc2 mechanism instead.");
3157 return -1;
3158 }
3159 g_compressor = code;
3160
3161 /* Check whether the library should be initialized */
3162 if (!g_initlib) blosc_init();
3163
3164 return code;
3165 }
3166
blosc_set_delta(int dodelta)3167 void blosc_set_delta(int dodelta) {
3168
3169 g_delta = dodelta;
3170
3171 /* Check whether the library should be initialized */
3172 if (!g_initlib) blosc_init();
3173
3174 }
3175
blosc_list_compressors(void)3176 const char* blosc_list_compressors(void) {
3177 static int compressors_list_done = 0;
3178 static char ret[256];
3179
3180 if (compressors_list_done) return ret;
3181 ret[0] = '\0';
3182 strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
3183 strcat(ret, ",");
3184 strcat(ret, BLOSC_LZ4_COMPNAME);
3185 strcat(ret, ",");
3186 strcat(ret, BLOSC_LZ4HC_COMPNAME);
3187 #if defined(HAVE_ZLIB)
3188 strcat(ret, ",");
3189 strcat(ret, BLOSC_ZLIB_COMPNAME);
3190 #endif /* HAVE_ZLIB */
3191 #if defined(HAVE_ZSTD)
3192 strcat(ret, ",");
3193 strcat(ret, BLOSC_ZSTD_COMPNAME);
3194 #endif /* HAVE_ZSTD */
3195 compressors_list_done = 1;
3196 return ret;
3197 }
3198
3199
blosc_get_version_string(void)3200 const char* blosc_get_version_string(void) {
3201 return BLOSC_VERSION_STRING;
3202 }
3203
3204
blosc_get_complib_info(const char * compname,char ** complib,char ** version)3205 int blosc_get_complib_info(const char* compname, char** complib, char** version) {
3206 int clibcode;
3207 const char* clibname;
3208 const char* clibversion = "unknown";
3209 char sbuffer[256];
3210
3211 clibcode = compname_to_clibcode(compname);
3212 clibname = clibcode_to_clibname(clibcode);
3213
3214 /* complib version */
3215 if (clibcode == BLOSC_BLOSCLZ_LIB) {
3216 clibversion = BLOSCLZ_VERSION_STRING;
3217 }
3218 else if (clibcode == BLOSC_LZ4_LIB) {
3219 sprintf(sbuffer, "%d.%d.%d",
3220 LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
3221 clibversion = sbuffer;
3222 }
3223 #if defined(HAVE_ZLIB)
3224 else if (clibcode == BLOSC_ZLIB_LIB) {
3225 clibversion = ZLIB_VERSION;
3226 }
3227 #endif /* HAVE_ZLIB */
3228 #if defined(HAVE_ZSTD)
3229 else if (clibcode == BLOSC_ZSTD_LIB) {
3230 sprintf(sbuffer, "%d.%d.%d",
3231 ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
3232 clibversion = sbuffer;
3233 }
3234 #endif /* HAVE_ZSTD */
3235
3236 #ifdef _MSC_VER
3237 *complib = _strdup(clibname);
3238 *version = _strdup(clibversion);
3239 #else
3240 *complib = strdup(clibname);
3241 *version = strdup(clibversion);
3242 #endif
3243 return clibcode;
3244 }
3245
3246 /* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
blosc_cbuffer_sizes(const void * cbuffer,size_t * nbytes,size_t * cbytes,size_t * blocksize)3247 void blosc_cbuffer_sizes(const void* cbuffer, size_t* nbytes, size_t* cbytes, size_t* blocksize) {
3248 int32_t nbytes32, cbytes32, blocksize32;
3249 blosc2_cbuffer_sizes(cbuffer, &nbytes32, &cbytes32, &blocksize32);
3250 *nbytes = nbytes32;
3251 *cbytes = cbytes32;
3252 *blocksize = blocksize32;
3253 }
3254
blosc2_cbuffer_sizes(const void * cbuffer,int32_t * nbytes,int32_t * cbytes,int32_t * blocksize)3255 int blosc2_cbuffer_sizes(const void* cbuffer, int32_t* nbytes, int32_t* cbytes, int32_t* blocksize) {
3256 blosc_header header;
3257 int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3258 if (rc < 0) {
3259 /* Return zeros if error reading header */
3260 memset(&header, 0, sizeof(header));
3261 }
3262
3263 /* Read the interesting values */
3264 if (nbytes != NULL)
3265 *nbytes = header.nbytes;
3266 if (cbytes != NULL)
3267 *cbytes = header.cbytes;
3268 if (blocksize != NULL)
3269 *blocksize = header.blocksize;
3270 return rc;
3271 }
3272
blosc_cbuffer_validate(const void * cbuffer,size_t cbytes,size_t * nbytes)3273 int blosc_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
3274 int32_t header_cbytes;
3275 int32_t header_nbytes;
3276 if (cbytes < BLOSC_MIN_HEADER_LENGTH) {
3277 /* Compressed data should contain enough space for header */
3278 *nbytes = 0;
3279 return BLOSC2_ERROR_WRITE_BUFFER;
3280 }
3281 int rc = blosc2_cbuffer_sizes(cbuffer, &header_nbytes, &header_cbytes, NULL);
3282 if (rc < 0) {
3283 *nbytes = 0;
3284 return rc;
3285 }
3286 *nbytes = header_nbytes;
3287 if (header_cbytes != cbytes) {
3288 /* Compressed size from header does not match `cbytes` */
3289 *nbytes = 0;
3290 return BLOSC2_ERROR_INVALID_HEADER;
3291 }
3292 if (*nbytes > BLOSC_MAX_BUFFERSIZE) {
3293 /* Uncompressed size is larger than allowed */
3294 *nbytes = 0;
3295 return BLOSC2_ERROR_MEMORY_ALLOC;
3296 }
3297 return 0;
3298 }
3299
3300 /* Return `typesize` and `flags` from a compressed buffer. */
blosc_cbuffer_metainfo(const void * cbuffer,size_t * typesize,int * flags)3301 void blosc_cbuffer_metainfo(const void* cbuffer, size_t* typesize, int* flags) {
3302 blosc_header header;
3303 int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3304 if (rc < 0) {
3305 *typesize = *flags = 0;
3306 return;
3307 }
3308
3309 /* Read the interesting values */
3310 *flags = header.flags;
3311 *typesize = header.typesize;
3312 }
3313
3314
3315 /* Return version information from a compressed buffer. */
blosc_cbuffer_versions(const void * cbuffer,int * version,int * versionlz)3316 void blosc_cbuffer_versions(const void* cbuffer, int* version, int* versionlz) {
3317 blosc_header header;
3318 int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3319 if (rc < 0) {
3320 *version = *versionlz = 0;
3321 return;
3322 }
3323
3324 /* Read the version info */
3325 *version = header.version;
3326 *versionlz = header.versionlz;
3327 }
3328
3329
3330 /* Return the compressor library/format used in a compressed buffer. */
blosc_cbuffer_complib(const void * cbuffer)3331 const char* blosc_cbuffer_complib(const void* cbuffer) {
3332 blosc_header header;
3333 int clibcode;
3334 const char* complib;
3335 int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3336 if (rc < 0) {
3337 return NULL;
3338 }
3339
3340 /* Read the compressor format/library info */
3341 clibcode = (header.flags & 0xe0) >> 5;
3342 complib = clibcode_to_clibname(clibcode);
3343 return complib;
3344 }
3345
3346
3347 /* Get the internal blocksize to be used during compression. 0 means
3348 that an automatic blocksize is computed internally. */
blosc_get_blocksize(void)3349 int blosc_get_blocksize(void)
3350 {
3351 return (int)g_force_blocksize;
3352 }
3353
3354
3355 /* Force the use of a specific blocksize. If 0, an automatic
3356 blocksize will be used (the default). */
blosc_set_blocksize(size_t size)3357 void blosc_set_blocksize(size_t size) {
3358 g_force_blocksize = (int32_t)size;
3359 }
3360
3361
3362 /* Set pointer to super-chunk. If NULL, no super-chunk will be
3363 reachable (the default). */
blosc_set_schunk(blosc2_schunk * schunk)3364 void blosc_set_schunk(blosc2_schunk* schunk) {
3365 g_schunk = schunk;
3366 g_global_context->schunk = schunk;
3367 }
3368
3369 blosc2_io *blosc2_io_global = NULL;
3370
blosc_init(void)3371 void blosc_init(void) {
3372 /* Return if Blosc is already initialized */
3373 if (g_initlib) return;
3374
3375 g_ncodecs = 0;
3376 g_nfilters = 0;
3377
3378 #if defined(HAVE_PLUGINS)
3379 #include "blosc2/blosc2-common.h"
3380 #include "blosc2/blosc2-stdio.h"
3381 register_codecs();
3382 register_filters();
3383 #endif
3384 pthread_mutex_init(&global_comp_mutex, NULL);
3385 /* Create a global context */
3386 g_global_context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3387 memset(g_global_context, 0, sizeof(blosc2_context));
3388 g_global_context->nthreads = g_nthreads;
3389 g_global_context->new_nthreads = g_nthreads;
3390 g_initlib = 1;
3391 }
3392
3393
blosc_destroy(void)3394 void blosc_destroy(void) {
3395 /* Return if Blosc is not initialized */
3396 if (!g_initlib) return;
3397
3398 g_initlib = 0;
3399 blosc2_free_ctx(g_global_context);
3400
3401 pthread_mutex_destroy(&global_comp_mutex);
3402
3403 }
3404
3405
release_threadpool(blosc2_context * context)3406 int release_threadpool(blosc2_context *context) {
3407 int32_t t;
3408 void* status;
3409 int rc;
3410
3411 if (context->threads_started > 0) {
3412 if (threads_callback) {
3413 /* free context data for user-managed threads */
3414 for (t=0; t<context->threads_started; t++)
3415 destroy_thread_context(context->thread_contexts + t);
3416 my_free(context->thread_contexts);
3417 }
3418 else {
3419 /* Tell all existing threads to finish */
3420 context->end_threads = 1;
3421 WAIT_INIT(-1, context);
3422
3423 /* Join exiting threads */
3424 for (t = 0; t < context->threads_started; t++) {
3425 rc = pthread_join(context->threads[t], &status);
3426 if (rc) {
3427 BLOSC_TRACE_ERROR("Return code from pthread_join() is %d\n"
3428 "\tError detail: %s.", rc, strerror(rc));
3429 }
3430 }
3431
3432 /* Thread attributes */
3433 #if !defined(_WIN32)
3434 pthread_attr_destroy(&context->ct_attr);
3435 #endif
3436
3437 /* Release thread handlers */
3438 my_free(context->threads);
3439 }
3440
3441 /* Release mutex and condition variable objects */
3442 pthread_mutex_destroy(&context->count_mutex);
3443 pthread_mutex_destroy(&context->delta_mutex);
3444 pthread_cond_destroy(&context->delta_cv);
3445
3446 /* Barriers */
3447 #ifdef BLOSC_POSIX_BARRIERS
3448 pthread_barrier_destroy(&context->barr_init);
3449 pthread_barrier_destroy(&context->barr_finish);
3450 #else
3451 pthread_mutex_destroy(&context->count_threads_mutex);
3452 pthread_cond_destroy(&context->count_threads_cv);
3453 context->count_threads = 0; /* Reset threads counter */
3454 #endif
3455
3456 /* Reset flags and counters */
3457 context->end_threads = 0;
3458 context->threads_started = 0;
3459 }
3460
3461
3462 return 0;
3463 }
3464
blosc_free_resources(void)3465 int blosc_free_resources(void) {
3466 /* Return if Blosc is not initialized */
3467 if (!g_initlib) return BLOSC2_ERROR_FAILURE;
3468
3469 return release_threadpool(g_global_context);
3470 }
3471
3472
3473 /* Contexts */
3474
3475 /* Create a context for compression */
blosc2_create_cctx(blosc2_cparams cparams)3476 blosc2_context* blosc2_create_cctx(blosc2_cparams cparams) {
3477 blosc2_context* context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3478 BLOSC_ERROR_NULL(context, NULL);
3479
3480 /* Populate the context, using zeros as default values */
3481 memset(context, 0, sizeof(blosc2_context));
3482 context->do_compress = 1; /* meant for compression */
3483 context->compcode = cparams.compcode;
3484 context->compcode_meta = cparams.compcode_meta;
3485 context->clevel = cparams.clevel;
3486 context->use_dict = cparams.use_dict;
3487 context->typesize = cparams.typesize;
3488 for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
3489 context->filters[i] = cparams.filters[i];
3490 context->filters_meta[i] = cparams.filters_meta[i];
3491
3492 if (context->filters[i] >= BLOSC_LAST_FILTER && context->filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
3493 BLOSC_TRACE_ERROR("filter (%d) is not yet defined",
3494 context->filters[i]);
3495 free(context);
3496 return NULL;
3497 }
3498 if (context->filters[i] > BLOSC_LAST_REGISTERED_FILTER && context->filters[i] <= BLOSC2_GLOBAL_REGISTERED_FILTERS_STOP) {
3499 BLOSC_TRACE_ERROR("filter (%d) is not yet defined",
3500 context->filters[i]);
3501 free(context);
3502 return NULL;
3503 }
3504 }
3505
3506 context->nthreads = cparams.nthreads;
3507 context->new_nthreads = context->nthreads;
3508 context->blocksize = cparams.blocksize;
3509 context->splitmode = cparams.splitmode;
3510 context->threads_started = 0;
3511 context->schunk = cparams.schunk;
3512
3513 if (cparams.prefilter != NULL) {
3514 context->prefilter = cparams.prefilter;
3515 context->preparams = (blosc2_prefilter_params*)my_malloc(sizeof(blosc2_prefilter_params));
3516 BLOSC_ERROR_NULL(context->preparams, NULL);
3517 memcpy(context->preparams, cparams.preparams, sizeof(blosc2_prefilter_params));
3518 }
3519
3520 if (cparams.udbtune == NULL) {
3521 context->udbtune = &BTUNE_DEFAULTS;
3522 } else {
3523 context->udbtune = cparams.udbtune;
3524 }
3525
3526 return context;
3527 }
3528
3529 /* Create a context for decompression */
blosc2_create_dctx(blosc2_dparams dparams)3530 blosc2_context* blosc2_create_dctx(blosc2_dparams dparams) {
3531 blosc2_context* context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3532 BLOSC_ERROR_NULL(context, NULL);
3533
3534 /* Populate the context, using zeros as default values */
3535 memset(context, 0, sizeof(blosc2_context));
3536 context->do_compress = 0; /* Meant for decompression */
3537 context->nthreads = dparams.nthreads;
3538 context->new_nthreads = context->nthreads;
3539 context->threads_started = 0;
3540 context->block_maskout = NULL;
3541 context->block_maskout_nitems = 0;
3542 context->schunk = dparams.schunk;
3543
3544 if (dparams.postfilter != NULL) {
3545 context->postfilter = dparams.postfilter;
3546 context->postparams = (blosc2_postfilter_params*)my_malloc(sizeof(blosc2_postfilter_params));
3547 BLOSC_ERROR_NULL(context->postparams, NULL);
3548 memcpy(context->postparams, dparams.postparams, sizeof(blosc2_postfilter_params));
3549 }
3550
3551 return context;
3552 }
3553
3554
blosc2_free_ctx(blosc2_context * context)3555 void blosc2_free_ctx(blosc2_context* context) {
3556 release_threadpool(context);
3557 if (context->serial_context != NULL) {
3558 free_thread_context(context->serial_context);
3559 }
3560 if (context->dict_cdict != NULL) {
3561 #ifdef HAVE_ZSTD
3562 ZSTD_freeCDict(context->dict_cdict);
3563 #endif
3564 }
3565 if (context->dict_ddict != NULL) {
3566 #ifdef HAVE_ZSTD
3567 ZSTD_freeDDict(context->dict_ddict);
3568 #endif
3569 }
3570 if (context->btune != NULL) {
3571 context->udbtune->btune_free(context);
3572 }
3573 if (context->prefilter != NULL) {
3574 my_free(context->preparams);
3575 }
3576 if (context->postfilter != NULL) {
3577 my_free(context->postparams);
3578 }
3579
3580 if (context->block_maskout != NULL) {
3581 free(context->block_maskout);
3582 }
3583 my_free(context);
3584 }
3585
3586
blosc2_ctx_get_cparams(blosc2_context * ctx,blosc2_cparams * cparams)3587 int blosc2_ctx_get_cparams(blosc2_context *ctx, blosc2_cparams *cparams) {
3588 cparams->compcode = ctx->compcode;
3589 cparams->compcode_meta = ctx->compcode_meta;
3590 cparams->clevel = ctx->clevel;
3591 cparams->use_dict = ctx->use_dict;
3592 cparams->typesize = ctx->typesize;
3593 cparams->nthreads = ctx->nthreads;
3594 cparams->blocksize = ctx->blocksize;
3595 cparams->splitmode = ctx->splitmode;
3596 cparams->schunk = ctx->schunk;
3597 for (int i = 0; i < BLOSC2_MAX_FILTERS; ++i) {
3598 cparams->filters[i] = ctx->filters[i];
3599 cparams->filters_meta[i] = ctx->filters_meta[i];
3600 }
3601 cparams->prefilter = ctx->prefilter;
3602 cparams->preparams = ctx->preparams;
3603 cparams->udbtune = ctx->udbtune;
3604
3605 return BLOSC2_ERROR_SUCCESS;
3606 }
3607
3608
blosc2_ctx_get_dparams(blosc2_context * ctx,blosc2_dparams * dparams)3609 int blosc2_ctx_get_dparams(blosc2_context *ctx, blosc2_dparams *dparams) {
3610 dparams->nthreads = ctx->nthreads;
3611 dparams->schunk = ctx->schunk;
3612 dparams->postfilter = ctx->postfilter;
3613 dparams->postparams = ctx->postparams;
3614
3615 return BLOSC2_ERROR_SUCCESS;
3616 }
3617
3618
3619 /* Set a maskout in decompression context */
blosc2_set_maskout(blosc2_context * ctx,bool * maskout,int nblocks)3620 int blosc2_set_maskout(blosc2_context *ctx, bool *maskout, int nblocks) {
3621
3622 if (ctx->block_maskout != NULL) {
3623 // Get rid of a possible mask here
3624 free(ctx->block_maskout);
3625 }
3626
3627 bool *maskout_ = malloc(nblocks);
3628 BLOSC_ERROR_NULL(maskout_, BLOSC2_ERROR_MEMORY_ALLOC);
3629 memcpy(maskout_, maskout, nblocks);
3630 ctx->block_maskout = maskout_;
3631 ctx->block_maskout_nitems = nblocks;
3632
3633 return 0;
3634 }
3635
3636
3637 /* Create a chunk made of zeros */
blosc2_chunk_zeros(blosc2_cparams cparams,const size_t nbytes,void * dest,size_t destsize)3638 int blosc2_chunk_zeros(blosc2_cparams cparams, const size_t nbytes, void* dest, size_t destsize) {
3639 if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
3640 BLOSC_TRACE_ERROR("dest buffer is not long enough");
3641 return BLOSC2_ERROR_DATA;
3642 }
3643
3644 if (nbytes % cparams.typesize) {
3645 BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
3646 return BLOSC2_ERROR_DATA;
3647 }
3648
3649 blosc_header header;
3650 blosc2_context* context = blosc2_create_cctx(cparams);
3651
3652 int error = initialize_context_compression(
3653 context, NULL, nbytes, dest, destsize,
3654 context->clevel, context->filters, context->filters_meta,
3655 context->typesize, context->compcode, context->blocksize,
3656 context->new_nthreads, context->nthreads,
3657 context->udbtune, context->btune, context->schunk);
3658 if (error <= 0) {
3659 blosc2_free_ctx(context);
3660 return error;
3661 }
3662
3663 memset(&header, 0, sizeof(header));
3664 header.version = BLOSC_VERSION_FORMAT;
3665 header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
3666 header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE; // extended header
3667 header.typesize = context->typesize;
3668 header.nbytes = (int32_t)nbytes;
3669 header.blocksize = context->blocksize;
3670 header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
3671 header.blosc2_flags = BLOSC2_SPECIAL_ZERO << 4; // mark chunk as all zeros
3672 memcpy((uint8_t *)dest, &header, sizeof(header));
3673
3674 blosc2_free_ctx(context);
3675
3676 return BLOSC_EXTENDED_HEADER_LENGTH;
3677 }
3678
3679
3680 /* Create a chunk made of uninitialized values */
blosc2_chunk_uninit(blosc2_cparams cparams,const size_t nbytes,void * dest,size_t destsize)3681 int blosc2_chunk_uninit(blosc2_cparams cparams, const size_t nbytes, void* dest, size_t destsize) {
3682 if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
3683 BLOSC_TRACE_ERROR("dest buffer is not long enough");
3684 return BLOSC2_ERROR_DATA;
3685 }
3686
3687 if (nbytes % cparams.typesize) {
3688 BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
3689 return BLOSC2_ERROR_DATA;
3690 }
3691
3692 blosc_header header;
3693 blosc2_context* context = blosc2_create_cctx(cparams);
3694 int error = initialize_context_compression(
3695 context, NULL, nbytes, dest, destsize,
3696 context->clevel, context->filters, context->filters_meta,
3697 context->typesize, context->compcode, context->blocksize,
3698 context->new_nthreads, context->nthreads,
3699 context->udbtune, context->btune, context->schunk);
3700 if (error <= 0) {
3701 blosc2_free_ctx(context);
3702 return error;
3703 }
3704
3705 memset(&header, 0, sizeof(header));
3706 header.version = BLOSC_VERSION_FORMAT;
3707 header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
3708 header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE; // extended header
3709 header.typesize = context->typesize;
3710 header.nbytes = (int32_t)nbytes;
3711 header.blocksize = context->blocksize;
3712 header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
3713 header.blosc2_flags = BLOSC2_SPECIAL_UNINIT << 4; // mark chunk as unitialized
3714 memcpy((uint8_t *)dest, &header, sizeof(header));
3715
3716 blosc2_free_ctx(context);
3717
3718 return BLOSC_EXTENDED_HEADER_LENGTH;
3719 }
3720
3721
3722 /* Create a chunk made of nans */
blosc2_chunk_nans(blosc2_cparams cparams,const size_t nbytes,void * dest,size_t destsize)3723 int blosc2_chunk_nans(blosc2_cparams cparams, const size_t nbytes, void* dest, size_t destsize) {
3724 if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
3725 BLOSC_TRACE_ERROR("dest buffer is not long enough");
3726 return BLOSC2_ERROR_DATA;
3727 }
3728
3729 if (nbytes % cparams.typesize) {
3730 BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
3731 return BLOSC2_ERROR_DATA;
3732 }
3733
3734 blosc_header header;
3735 blosc2_context* context = blosc2_create_cctx(cparams);
3736
3737 int error = initialize_context_compression(
3738 context, NULL, nbytes, dest, destsize,
3739 context->clevel, context->filters, context->filters_meta,
3740 context->typesize, context->compcode, context->blocksize,
3741 context->new_nthreads, context->nthreads,
3742 context->udbtune, context->btune, context->schunk);
3743 if (error <= 0) {
3744 blosc2_free_ctx(context);
3745 return error;
3746 }
3747
3748 memset(&header, 0, sizeof(header));
3749 header.version = BLOSC_VERSION_FORMAT;
3750 header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
3751 header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE; // extended header
3752 header.typesize = context->typesize;
3753 header.nbytes = (int32_t)nbytes;
3754 header.blocksize = context->blocksize;
3755 header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
3756 header.blosc2_flags = BLOSC2_SPECIAL_NAN << 4; // mark chunk as all NaNs
3757 memcpy((uint8_t *)dest, &header, sizeof(header));
3758
3759 blosc2_free_ctx(context);
3760
3761 return BLOSC_EXTENDED_HEADER_LENGTH;
3762 }
3763
3764
3765 /* Create a chunk made of repeated values */
blosc2_chunk_repeatval(blosc2_cparams cparams,const size_t nbytes,void * dest,size_t destsize,void * repeatval)3766 int blosc2_chunk_repeatval(blosc2_cparams cparams, const size_t nbytes,
3767 void* dest, size_t destsize, void* repeatval) {
3768 uint8_t typesize = cparams.typesize;
3769 if (destsize < BLOSC_EXTENDED_HEADER_LENGTH + typesize) {
3770 BLOSC_TRACE_ERROR("dest buffer is not long enough");
3771 return BLOSC2_ERROR_DATA;
3772 }
3773
3774 if (nbytes % cparams.typesize) {
3775 BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
3776 return BLOSC2_ERROR_DATA;
3777 }
3778
3779 blosc_header header;
3780 blosc2_context* context = blosc2_create_cctx(cparams);
3781
3782 int error = initialize_context_compression(
3783 context, NULL, nbytes, dest, destsize,
3784 context->clevel, context->filters, context->filters_meta,
3785 context->typesize, context->compcode, context->blocksize,
3786 context->new_nthreads, context->nthreads,
3787 context->udbtune, context->btune, context->schunk);
3788 if (error <= 0) {
3789 blosc2_free_ctx(context);
3790 return error;
3791 }
3792
3793 memset(&header, 0, sizeof(header));
3794 header.version = BLOSC_VERSION_FORMAT;
3795 header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
3796 header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE; // extended header
3797 header.typesize = (uint8_t)typesize;
3798 header.nbytes = (int32_t)nbytes;
3799 header.blocksize = context->blocksize;
3800 header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH + (int32_t)typesize;
3801 header.blosc2_flags = BLOSC2_SPECIAL_VALUE << 4; // mark chunk as all repeated value
3802 memcpy((uint8_t *)dest, &header, sizeof(header));
3803 memcpy((uint8_t *)dest + sizeof(header), repeatval, typesize);
3804
3805 blosc2_free_ctx(context);
3806
3807 return BLOSC_EXTENDED_HEADER_LENGTH + (uint8_t)typesize;
3808 }
3809
3810
3811 /* Register filters */
3812
register_filter_private(blosc2_filter * filter)3813 int register_filter_private(blosc2_filter *filter) {
3814 BLOSC_ERROR_NULL(filter, BLOSC2_ERROR_INVALID_PARAM);
3815 if (g_nfilters == UINT8_MAX) {
3816 BLOSC_TRACE_ERROR("Can not register more filters");
3817 return BLOSC2_ERROR_CODEC_SUPPORT;
3818 }
3819 if (filter->id < BLOSC2_GLOBAL_REGISTERED_FILTERS_START) {
3820 BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_FILTERS_START);
3821 return BLOSC2_ERROR_FAILURE;
3822 }
3823 if (filter->id > BLOSC2_USER_REGISTERED_FILTERS_STOP) {
3824 BLOSC_TRACE_ERROR("The id must be leather or equal than %d", BLOSC2_USER_REGISTERED_FILTERS_STOP);
3825 return BLOSC2_ERROR_FAILURE;
3826 }
3827
3828 // Check if the filter is already registered
3829 for (int i = 0; i < g_nfilters; ++i) {
3830 if (g_filters[i].id == filter->id) {
3831 BLOSC_TRACE_ERROR("The filter is already registered!");
3832 return BLOSC2_ERROR_FAILURE;
3833 }
3834 }
3835
3836 blosc2_filter *filter_new = &g_filters[g_nfilters++];
3837 memcpy(filter_new, filter, sizeof(blosc2_filter));
3838
3839 return BLOSC2_ERROR_SUCCESS;
3840 }
3841
3842
blosc2_register_filter(blosc2_filter * filter)3843 int blosc2_register_filter(blosc2_filter *filter) {
3844 if (filter->id < BLOSC2_USER_REGISTERED_FILTERS_START) {
3845 BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_USER_REGISTERED_FILTERS_START);
3846 return BLOSC2_ERROR_FAILURE;
3847 }
3848
3849 return register_filter_private(filter);
3850 }
3851
3852
3853 /* Register codecs */
3854
register_codec_private(blosc2_codec * codec)3855 int register_codec_private(blosc2_codec *codec) {
3856 BLOSC_ERROR_NULL(codec, BLOSC2_ERROR_INVALID_PARAM);
3857 if (g_ncodecs == UINT8_MAX) {
3858 BLOSC_TRACE_ERROR("Can not register more codecs");
3859 return BLOSC2_ERROR_CODEC_SUPPORT;
3860 }
3861 if (codec->compcode < BLOSC2_GLOBAL_REGISTERED_CODECS_START) {
3862 BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_CODECS_START);
3863 return BLOSC2_ERROR_FAILURE;
3864 }
3865 if (codec->compcode > BLOSC2_USER_REGISTERED_CODECS_STOP) {
3866 BLOSC_TRACE_ERROR("The id must be leather or equal than %d", BLOSC2_USER_REGISTERED_CODECS_STOP);
3867 return BLOSC2_ERROR_FAILURE;
3868 }
3869
3870 // Check if the code is already registered
3871 for (int i = 0; i < g_ncodecs; ++i) {
3872 if (g_codecs[i].compcode == codec->compcode) {
3873 BLOSC_TRACE_ERROR("The codec is already registered!");
3874 return BLOSC2_ERROR_CODEC_PARAM;
3875 }
3876 }
3877
3878 blosc2_codec *codec_new = &g_codecs[g_ncodecs++];
3879 memcpy(codec_new, codec, sizeof(blosc2_codec));
3880
3881 return BLOSC2_ERROR_SUCCESS;
3882 }
3883
3884
blosc2_register_codec(blosc2_codec * codec)3885 int blosc2_register_codec(blosc2_codec *codec) {
3886 if (codec->compcode < BLOSC2_USER_REGISTERED_CODECS_START) {
3887 BLOSC_TRACE_ERROR("The compcode must be greater or equal than %d", BLOSC2_USER_REGISTERED_CODECS_START);
3888 return BLOSC2_ERROR_CODEC_PARAM;
3889 }
3890
3891 return register_codec_private(codec);
3892 }
3893
3894
_blosc2_register_io_cb(const blosc2_io_cb * io)3895 int _blosc2_register_io_cb(const blosc2_io_cb *io) {
3896
3897 // Check if the io is already registered
3898 for (int i = 0; i < g_nio; ++i) {
3899 if (g_io[i].id == io->id) {
3900 BLOSC_TRACE_ERROR("The codec is already registered!");
3901 return BLOSC2_ERROR_PLUGIN_IO;
3902 }
3903 }
3904
3905 blosc2_io_cb *io_new = &g_io[g_nio++];
3906 memcpy(io_new, io, sizeof(blosc2_io_cb));
3907
3908 return BLOSC2_ERROR_SUCCESS;
3909 }
3910
blosc2_register_io_cb(const blosc2_io_cb * io)3911 int blosc2_register_io_cb(const blosc2_io_cb *io) {
3912 BLOSC_ERROR_NULL(io, BLOSC2_ERROR_INVALID_PARAM);
3913 if (g_nio == UINT8_MAX) {
3914 BLOSC_TRACE_ERROR("Can not register more codecs");
3915 return BLOSC2_ERROR_PLUGIN_IO;
3916 }
3917
3918 if (io->id < BLOSC2_IO_REGISTERED) {
3919 BLOSC_TRACE_ERROR("The compcode must be greater or equal than %d", BLOSC2_IO_REGISTERED);
3920 return BLOSC2_ERROR_PLUGIN_IO;
3921 }
3922
3923 return _blosc2_register_io_cb(io);
3924 }
3925
blosc2_get_io_cb(uint8_t id)3926 blosc2_io_cb *blosc2_get_io_cb(uint8_t id) {
3927 for (int i = 0; i < g_nio; ++i) {
3928 if (g_io[i].id == id) {
3929 return &g_io[i];
3930 }
3931 }
3932 if (id == BLOSC2_IO_FILESYSTEM) {
3933 if (_blosc2_register_io_cb(&BLOSC2_IO_CB_DEFAULTS) < 0) {
3934 BLOSC_TRACE_ERROR("Error registering the default IO API");
3935 return NULL;
3936 }
3937 return blosc2_get_io_cb(id);
3938 }
3939 return NULL;
3940 }
3941