1 /*********************************************************************
2   Blosc - Blocked Shuffling and Compression Library
3 
4   Copyright (C) 2021  The Blosc Developers <blosc@blosc.org>
5   https://blosc.org
6   License: BSD 3-Clause (see LICENSE.txt)
7 
8   See LICENSE.txt for details about copyright and rights to use.
9 **********************************************************************/
10 
11 
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <errno.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <assert.h>
18 #include <math.h>
19 
20 #include "blosc2.h"
21 #include "blosc-private.h"
22 #include "frame.h"
23 
24 
25 #if defined(USING_CMAKE)
26   #include "config.h"
27 #endif /*  USING_CMAKE */
28 #include "context.h"
29 
30 #include "shuffle.h"
31 #include "delta.h"
32 #include "trunc-prec.h"
33 #include "blosclz.h"
34 #include "stune.h"
35 #include "config.h"
36 #include "blosc2/codecs-registry.h"
37 #include "blosc2/filters-registry.h"
38 
39 #include "lz4.h"
40 #include "lz4hc.h"
41 #ifdef HAVE_IPP
42   #include <ipps.h>
43   #include <ippdc.h>
44 #endif
45 #if defined(HAVE_ZLIB_NG)
46   #include "zlib.h"
47 #elif defined(HAVE_ZLIB)
48   #include "zlib.h"
49 #endif /*  HAVE_MINIZ */
50 #if defined(HAVE_ZSTD)
51   #include "zstd.h"
52   #include "zstd_errors.h"
53   // #include "cover.h"  // for experimenting with fast cover training for building dicts
54   #include "zdict.h"
55 #endif /*  HAVE_ZSTD */
56 
57 
58 #if defined(_WIN32) && !defined(__MINGW32__)
59   #include <windows.h>
60   #include <malloc.h>
61   #include <process.h>
62   #define getpid _getpid
63 #endif  /* _WIN32 */
64 
65 #if defined(_WIN32) && !defined(__GNUC__)
66   #include "win32/pthread.c"
67 #endif
68 
69 /* Synchronization variables */
70 
71 /* Global context for non-contextual API */
72 static blosc2_context* g_global_context;
73 static pthread_mutex_t global_comp_mutex;
74 static int g_compressor = BLOSC_BLOSCLZ;
75 static int g_delta = 0;
76 /* the compressor to use by default */
77 static int16_t g_nthreads = 1;
78 static int32_t g_force_blocksize = 0;
79 static int g_initlib = 0;
80 static blosc2_schunk* g_schunk = NULL;   /* the pointer to super-chunk */
81 
82 blosc2_codec g_codecs[256] = {0};
83 uint8_t g_ncodecs = 0;
84 
85 static blosc2_filter g_filters[256] = {0};
86 static uint64_t g_nfilters = 0;
87 
88 static blosc2_io_cb g_io[256] = {0};
89 static uint64_t g_nio = 0;
90 
91 
92 // Forward declarations
93 int init_threadpool(blosc2_context *context);
94 int release_threadpool(blosc2_context *context);
95 
96 /* Macros for synchronization */
97 
98 /* Wait until all threads are initialized */
99 #ifdef BLOSC_POSIX_BARRIERS
100 #define WAIT_INIT(RET_VAL, CONTEXT_PTR)  \
101   rc = pthread_barrier_wait(&(CONTEXT_PTR)->barr_init); \
102   if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
103     BLOSC_TRACE_ERROR("Could not wait on barrier (init): %d", rc); \
104     return((RET_VAL));                            \
105   }
106 #else
107 #define WAIT_INIT(RET_VAL, CONTEXT_PTR)   \
108   pthread_mutex_lock(&(CONTEXT_PTR)->count_threads_mutex); \
109   if ((CONTEXT_PTR)->count_threads < (CONTEXT_PTR)->nthreads) { \
110     (CONTEXT_PTR)->count_threads++;  \
111     pthread_cond_wait(&(CONTEXT_PTR)->count_threads_cv, \
112                       &(CONTEXT_PTR)->count_threads_mutex); \
113   } \
114   else { \
115     pthread_cond_broadcast(&(CONTEXT_PTR)->count_threads_cv); \
116   } \
117   pthread_mutex_unlock(&(CONTEXT_PTR)->count_threads_mutex);
118 #endif
119 
120 /* Wait for all threads to finish */
121 #ifdef BLOSC_POSIX_BARRIERS
122 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR)   \
123   rc = pthread_barrier_wait(&(CONTEXT_PTR)->barr_finish); \
124   if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
125     BLOSC_TRACE_ERROR("Could not wait on barrier (finish)"); \
126     return((RET_VAL));                              \
127   }
128 #else
129 #define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                           \
130   pthread_mutex_lock(&(CONTEXT_PTR)->count_threads_mutex); \
131   if ((CONTEXT_PTR)->count_threads > 0) { \
132     (CONTEXT_PTR)->count_threads--; \
133     pthread_cond_wait(&(CONTEXT_PTR)->count_threads_cv, \
134                       &(CONTEXT_PTR)->count_threads_mutex); \
135   } \
136   else { \
137     pthread_cond_broadcast(&(CONTEXT_PTR)->count_threads_cv); \
138   } \
139   pthread_mutex_unlock(&(CONTEXT_PTR)->count_threads_mutex);
140 #endif
141 
142 
143 /* global variable to change threading backend from Blosc-managed to caller-managed */
144 static blosc_threads_callback threads_callback = 0;
145 static void *threads_callback_data = 0;
146 
147 /* non-threadsafe function should be called before any other Blosc function in
148    order to change how threads are managed */
blosc_set_threads_callback(blosc_threads_callback callback,void * callback_data)149 void blosc_set_threads_callback(blosc_threads_callback callback, void *callback_data)
150 {
151   threads_callback = callback;
152   threads_callback_data = callback_data;
153 }
154 
155 
156 /* A function for aligned malloc that is portable */
my_malloc(size_t size)157 static uint8_t* my_malloc(size_t size) {
158   void* block = NULL;
159   int res = 0;
160 
161 /* Do an alignment to 32 bytes because AVX2 is supported */
162 #if defined(_WIN32)
163   /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
164   block = (void *)_aligned_malloc(size, 32);
165 #elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
166   /* Platform does have an implementation of posix_memalign */
167   res = posix_memalign(&block, 32, size);
168 #else
169   block = malloc(size);
170 #endif  /* _WIN32 */
171 
172   if (block == NULL || res != 0) {
173     BLOSC_TRACE_ERROR("Error allocating memory!");
174     return NULL;
175   }
176 
177   return (uint8_t*)block;
178 }
179 
180 
181 /* Release memory booked by my_malloc */
my_free(void * block)182 static void my_free(void* block) {
183 #if defined(_WIN32)
184   _aligned_free(block);
185 #else
186   free(block);
187 #endif  /* _WIN32 */
188 }
189 
190 
191 /*
192  * Conversion routines between compressor and compression libraries
193  */
194 
195 /* Return the library code associated with the compressor name */
compname_to_clibcode(const char * compname)196 static int compname_to_clibcode(const char* compname) {
197   if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
198     return BLOSC_BLOSCLZ_LIB;
199   if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
200     return BLOSC_LZ4_LIB;
201   if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
202     return BLOSC_LZ4_LIB;
203   if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
204     return BLOSC_ZLIB_LIB;
205   if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
206     return BLOSC_ZSTD_LIB;
207   for (int i = 0; i < g_ncodecs; ++i) {
208     if (strcmp(compname, g_codecs[i].compname) == 0)
209       return g_codecs[i].complib;
210   }
211   return BLOSC2_ERROR_NOT_FOUND;
212 }
213 
214 /* Return the library name associated with the compressor code */
clibcode_to_clibname(int clibcode)215 static const char* clibcode_to_clibname(int clibcode) {
216   if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
217   if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
218   if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
219   if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
220   for (int i = 0; i < g_ncodecs; ++i) {
221     if (clibcode == g_codecs[i].complib)
222       return g_codecs[i].compname;
223   }
224   return NULL;                  /* should never happen */
225 }
226 
227 
228 /*
229  * Conversion routines between compressor names and compressor codes
230  */
231 
232 /* Get the compressor name associated with the compressor code */
blosc_compcode_to_compname(int compcode,const char ** compname)233 int blosc_compcode_to_compname(int compcode, const char** compname) {
234   int code = -1;    /* -1 means non-existent compressor code */
235   const char* name = NULL;
236 
237   /* Map the compressor code */
238   if (compcode == BLOSC_BLOSCLZ)
239     name = BLOSC_BLOSCLZ_COMPNAME;
240   else if (compcode == BLOSC_LZ4)
241     name = BLOSC_LZ4_COMPNAME;
242   else if (compcode == BLOSC_LZ4HC)
243     name = BLOSC_LZ4HC_COMPNAME;
244   else if (compcode == BLOSC_ZLIB)
245     name = BLOSC_ZLIB_COMPNAME;
246   else if (compcode == BLOSC_ZSTD)
247     name = BLOSC_ZSTD_COMPNAME;
248   else {
249     for (int i = 0; i < g_ncodecs; ++i) {
250       if (compcode == g_codecs[i].compcode) {
251         name = g_codecs[i].compname;
252         break;
253       }
254     }
255   }
256 
257   *compname = name;
258 
259   /* Guess if there is support for this code */
260   if (compcode == BLOSC_BLOSCLZ)
261     code = BLOSC_BLOSCLZ;
262   else if (compcode == BLOSC_LZ4)
263     code = BLOSC_LZ4;
264   else if (compcode == BLOSC_LZ4HC)
265     code = BLOSC_LZ4HC;
266 #if defined(HAVE_ZLIB)
267   else if (compcode == BLOSC_ZLIB)
268     code = BLOSC_ZLIB;
269 #endif /* HAVE_ZLIB */
270 #if defined(HAVE_ZSTD)
271   else if (compcode == BLOSC_ZSTD)
272     code = BLOSC_ZSTD;
273 #endif /* HAVE_ZSTD */
274   else if (compcode >= BLOSC_LAST_CODEC)
275     code = compcode;
276   return code;
277 }
278 
279 /* Get the compressor code for the compressor name. -1 if it is not available */
blosc_compname_to_compcode(const char * compname)280 int blosc_compname_to_compcode(const char* compname) {
281   int code = -1;  /* -1 means non-existent compressor code */
282 
283   if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
284     code = BLOSC_BLOSCLZ;
285   }
286   else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
287     code = BLOSC_LZ4;
288   }
289   else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
290     code = BLOSC_LZ4HC;
291   }
292 #if defined(HAVE_ZLIB)
293   else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
294     code = BLOSC_ZLIB;
295   }
296 #endif /*  HAVE_ZLIB */
297 #if defined(HAVE_ZSTD)
298   else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
299     code = BLOSC_ZSTD;
300   }
301 #endif /*  HAVE_ZSTD */
302   else{
303     for (int i = 0; i < g_ncodecs; ++i) {
304       if (strcmp(compname, g_codecs[i].compname) == 0) {
305         code = g_codecs[i].compcode;
306         break;
307       }
308     }
309   }
310   return code;
311 }
312 
313 
314 /* Convert compressor code to blosc compressor format code */
compcode_to_compformat(int compcode)315 static int compcode_to_compformat(int compcode) {
316   switch (compcode) {
317     case BLOSC_BLOSCLZ: return BLOSC_BLOSCLZ_FORMAT;
318     case BLOSC_LZ4:     return BLOSC_LZ4_FORMAT;
319     case BLOSC_LZ4HC:   return BLOSC_LZ4HC_FORMAT;
320 
321 #if defined(HAVE_ZLIB)
322     case BLOSC_ZLIB:    return BLOSC_ZLIB_FORMAT;
323 #endif /*  HAVE_ZLIB */
324 
325 #if defined(HAVE_ZSTD)
326     case BLOSC_ZSTD:    return BLOSC_ZSTD_FORMAT;
327       break;
328 #endif /*  HAVE_ZSTD */
329     default:
330       return BLOSC_UDCODEC_FORMAT;
331   }
332   return -1;
333 }
334 
335 
336 /* Convert compressor code to blosc compressor format version */
compcode_to_compversion(int compcode)337 static int compcode_to_compversion(int compcode) {
338   /* Write compressor format */
339   switch (compcode) {
340     case BLOSC_BLOSCLZ: return BLOSC_BLOSCLZ_VERSION_FORMAT;
341     case BLOSC_LZ4:     return BLOSC_LZ4_VERSION_FORMAT;
342     case BLOSC_LZ4HC:   return BLOSC_LZ4HC_VERSION_FORMAT;
343 
344 #if defined(HAVE_ZLIB)
345     case BLOSC_ZLIB:    return BLOSC_ZLIB_VERSION_FORMAT;
346       break;
347 #endif /*  HAVE_ZLIB */
348 
349 #if defined(HAVE_ZSTD)
350     case BLOSC_ZSTD:    return BLOSC_ZSTD_VERSION_FORMAT;
351       break;
352 #endif /*  HAVE_ZSTD */
353     default:
354       for (int i = 0; i < g_ncodecs; ++i) {
355         if (compcode == g_codecs[i].compcode) {
356           return g_codecs[i].compver;
357         }
358       }
359   }
360   return -1;
361 }
362 
363 
lz4_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int accel,void * hash_table)364 static int lz4_wrap_compress(const char* input, size_t input_length,
365                              char* output, size_t maxout, int accel, void* hash_table) {
366   BLOSC_UNUSED_PARAM(accel);
367   int cbytes;
368 #ifdef HAVE_IPP
369   if (hash_table == NULL) {
370     return BLOSC2_ERROR_INVALID_PARAM;  // the hash table should always be initialized
371   }
372   int outlen = (int)maxout;
373   int inlen = (int)input_length;
374   // I have not found any function that uses `accel` like in `LZ4_compress_fast`, but
375   // the IPP LZ4Safe call does a pretty good job on compressing well, so let's use it
376   IppStatus status = ippsEncodeLZ4Safe_8u((const Ipp8u*)input, &inlen,
377                                            (Ipp8u*)output, &outlen, (Ipp8u*)hash_table);
378   if (status == ippStsDstSizeLessExpected) {
379     return 0;  // we cannot compress in required outlen
380   }
381   else if (status != ippStsNoErr) {
382     return BLOSC2_ERROR_FAILURE;  // an unexpected error happened
383   }
384   cbytes = outlen;
385 #else
386   BLOSC_UNUSED_PARAM(hash_table);
387   accel = 1;  // deactivate acceleration to match IPP behaviour
388   cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout, accel);
389 #endif
390   return cbytes;
391 }
392 
393 
lz4hc_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)394 static int lz4hc_wrap_compress(const char* input, size_t input_length,
395                                char* output, size_t maxout, int clevel) {
396   int cbytes;
397   if (input_length > (size_t)(UINT32_C(2) << 30))
398     return BLOSC2_ERROR_2GB_LIMIT;
399   /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
400    * but levels larger than 9 do not buy much compression. */
401   cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
402                            clevel);
403   return cbytes;
404 }
405 
406 
lz4_wrap_decompress(const char * input,size_t compressed_length,char * output,size_t maxout)407 static int lz4_wrap_decompress(const char* input, size_t compressed_length,
408                                char* output, size_t maxout) {
409   int nbytes;
410 #ifdef HAVE_IPP
411   int outlen = (int)maxout;
412   int inlen = (int)compressed_length;
413   IppStatus status;
414   status = ippsDecodeLZ4_8u((const Ipp8u*)input, inlen, (Ipp8u*)output, &outlen);
415   //status = ippsDecodeLZ4Dict_8u((const Ipp8u*)input, &inlen, (Ipp8u*)output, 0, &outlen, NULL, 1 << 16);
416   nbytes = (status == ippStsNoErr) ? outlen : -outlen;
417 #else
418   nbytes = LZ4_decompress_safe(input, output, (int)compressed_length, (int)maxout);
419 #endif
420   if (nbytes != (int)maxout) {
421     return 0;
422   }
423   return (int)maxout;
424 }
425 
426 #if defined(HAVE_ZLIB)
427 /* zlib is not very respectful with sharing name space with others.
428  Fortunately, its names do not collide with those already in blosc. */
zlib_wrap_compress(const char * input,size_t input_length,char * output,size_t maxout,int clevel)429 static int zlib_wrap_compress(const char* input, size_t input_length,
430                               char* output, size_t maxout, int clevel) {
431   int status;
432   uLongf cl = (uLongf)maxout;
433   status = compress2(
434       (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
435   if (status != Z_OK) {
436     return 0;
437   }
438   return (int)cl;
439 }
440 
zlib_wrap_decompress(const char * input,size_t compressed_length,char * output,size_t maxout)441 static int zlib_wrap_decompress(const char* input, size_t compressed_length,
442                                 char* output, size_t maxout) {
443   int status;
444   uLongf ul = (uLongf)maxout;
445   status = uncompress(
446       (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
447   if (status != Z_OK) {
448     return 0;
449   }
450   return (int)ul;
451 }
452 #endif /*  HAVE_ZLIB */
453 
454 
455 #if defined(HAVE_ZSTD)
zstd_wrap_compress(struct thread_context * thread_context,const char * input,size_t input_length,char * output,size_t maxout,int clevel)456 static int zstd_wrap_compress(struct thread_context* thread_context,
457                               const char* input, size_t input_length,
458                               char* output, size_t maxout, int clevel) {
459   size_t code;
460   blosc2_context* context = thread_context->parent_context;
461 
462   clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
463   /* Make the level 8 close enough to maxCLevel */
464   if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
465 
466   if (thread_context->zstd_cctx == NULL) {
467     thread_context->zstd_cctx = ZSTD_createCCtx();
468   }
469 
470   if (context->use_dict) {
471     assert(context->dict_cdict != NULL);
472     code = ZSTD_compress_usingCDict(
473             thread_context->zstd_cctx, (void*)output, maxout, (void*)input,
474             input_length, context->dict_cdict);
475   } else {
476     code = ZSTD_compressCCtx(thread_context->zstd_cctx,
477         (void*)output, maxout, (void*)input, input_length, clevel);
478   }
479   if (ZSTD_isError(code) != ZSTD_error_no_error) {
480     // Do not print anything because blosc will just memcpy this buffer
481     // fprintf(stderr, "Error in ZSTD compression: '%s'.  Giving up.\n",
482     //         ZDICT_getErrorName(code));
483     return 0;
484   }
485   return (int)code;
486 }
487 
zstd_wrap_decompress(struct thread_context * thread_context,const char * input,size_t compressed_length,char * output,size_t maxout)488 static int zstd_wrap_decompress(struct thread_context* thread_context,
489                                 const char* input, size_t compressed_length,
490                                 char* output, size_t maxout) {
491   size_t code;
492   blosc2_context* context = thread_context->parent_context;
493 
494   if (thread_context->zstd_dctx == NULL) {
495     thread_context->zstd_dctx = ZSTD_createDCtx();
496   }
497 
498   if (context->use_dict) {
499     assert(context->dict_ddict != NULL);
500     code = ZSTD_decompress_usingDDict(
501             thread_context->zstd_dctx, (void*)output, maxout, (void*)input,
502             compressed_length, context->dict_ddict);
503   } else {
504     code = ZSTD_decompressDCtx(thread_context->zstd_dctx,
505         (void*)output, maxout, (void*)input, compressed_length);
506   }
507   if (ZSTD_isError(code) != ZSTD_error_no_error) {
508     BLOSC_TRACE_ERROR("Error in ZSTD decompression: '%s'.  Giving up.",
509                       ZDICT_getErrorName(code));
510     return 0;
511   }
512   return (int)code;
513 }
514 #endif /*  HAVE_ZSTD */
515 
516 /* Compute acceleration for blosclz */
get_accel(const blosc2_context * context)517 static int get_accel(const blosc2_context* context) {
518   int clevel = context->clevel;
519 
520   if (context->compcode == BLOSC_LZ4) {
521     /* This acceleration setting based on discussions held in:
522      * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
523      */
524     return (10 - clevel);
525   }
526   return 1;
527 }
528 
529 
do_nothing(int8_t filter,char cmode)530 int do_nothing(int8_t filter, char cmode) {
531   if (cmode == 'c') {
532     return (filter == BLOSC_NOFILTER);
533   } else {
534     // TRUNC_PREC do not have to be applied during decompression
535     return ((filter == BLOSC_NOFILTER) || (filter == BLOSC_TRUNC_PREC));
536   }
537 }
538 
539 
next_filter(const uint8_t * filters,int current_filter,char cmode)540 int next_filter(const uint8_t* filters, int current_filter, char cmode) {
541   for (int i = current_filter - 1; i >= 0; i--) {
542     if (!do_nothing(filters[i], cmode)) {
543       return filters[i];
544     }
545   }
546   return BLOSC_NOFILTER;
547 }
548 
549 
last_filter(const uint8_t * filters,char cmode)550 int last_filter(const uint8_t* filters, char cmode) {
551   int last_index = -1;
552   for (int i = BLOSC2_MAX_FILTERS - 1; i >= 0; i--) {
553     if (!do_nothing(filters[i], cmode))  {
554       last_index = i;
555     }
556   }
557   return last_index;
558 }
559 
560 
561 /* Convert filter pipeline to filter flags */
filters_to_flags(const uint8_t * filters)562 static uint8_t filters_to_flags(const uint8_t* filters) {
563   uint8_t flags = 0;
564 
565   for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
566     switch (filters[i]) {
567       case BLOSC_SHUFFLE:
568         flags |= BLOSC_DOSHUFFLE;
569         break;
570       case BLOSC_BITSHUFFLE:
571         flags |= BLOSC_DOBITSHUFFLE;
572         break;
573       case BLOSC_DELTA:
574         flags |= BLOSC_DODELTA;
575         break;
576       default :
577         break;
578     }
579   }
580   return flags;
581 }
582 
583 
584 /* Convert filter flags to filter pipeline */
flags_to_filters(const uint8_t flags,uint8_t * filters)585 static void flags_to_filters(const uint8_t flags, uint8_t* filters) {
586   /* Initialize the filter pipeline */
587   memset(filters, 0, BLOSC2_MAX_FILTERS);
588   /* Fill the filter pipeline */
589   if (flags & BLOSC_DOSHUFFLE)
590     filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_SHUFFLE;
591   if (flags & BLOSC_DOBITSHUFFLE)
592     filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_BITSHUFFLE;
593   if (flags & BLOSC_DODELTA)
594     filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_DELTA;
595 }
596 
597 
598 /* Get filter flags from header flags */
get_filter_flags(const uint8_t header_flags,const int32_t typesize)599 static uint8_t get_filter_flags(const uint8_t header_flags,
600                                 const int32_t typesize) {
601   uint8_t flags = 0;
602 
603   if ((header_flags & BLOSC_DOSHUFFLE) && (typesize > 1)) {
604     flags |= BLOSC_DOSHUFFLE;
605   }
606   if (header_flags & BLOSC_DOBITSHUFFLE) {
607     flags |= BLOSC_DOBITSHUFFLE;
608   }
609   if (header_flags & BLOSC_DODELTA) {
610     flags |= BLOSC_DODELTA;
611   }
612   if (header_flags & BLOSC_MEMCPYED) {
613     flags |= BLOSC_MEMCPYED;
614   }
615   return flags;
616 }
617 
618 typedef struct blosc_header_s {
619   uint8_t version;
620   uint8_t versionlz;
621   uint8_t flags;
622   uint8_t typesize;
623   int32_t nbytes;
624   int32_t blocksize;
625   int32_t cbytes;
626   // Extended Blosc2 header
627   uint8_t filter_codes[BLOSC2_MAX_FILTERS];
628   uint8_t udcompcode;
629   uint8_t compcode_meta;
630   uint8_t filter_meta[BLOSC2_MAX_FILTERS];
631   uint8_t reserved2;
632   uint8_t blosc2_flags;
633 } blosc_header;
634 
635 
read_chunk_header(const uint8_t * src,int32_t srcsize,bool extended_header,blosc_header * header)636 int read_chunk_header(const uint8_t* src, int32_t srcsize, bool extended_header, blosc_header* header)
637 {
638   memset(header, 0, sizeof(blosc_header));
639 
640   if (srcsize < BLOSC_MIN_HEADER_LENGTH) {
641     BLOSC_TRACE_ERROR("Not enough space to read Blosc header.");
642     return BLOSC2_ERROR_READ_BUFFER;
643   }
644 
645   memcpy(header, src, BLOSC_MIN_HEADER_LENGTH);
646 
647   bool little_endian = is_little_endian();
648 
649   if (!little_endian) {
650     header->nbytes = bswap32_(header->nbytes);
651     header->blocksize = bswap32_(header->blocksize);
652     header->cbytes = bswap32_(header->cbytes);
653   }
654 
655   if (header->version > BLOSC_VERSION_FORMAT) {
656     /* Version from future */
657     return BLOSC2_ERROR_VERSION_SUPPORT;
658   }
659   if (header->cbytes < BLOSC_MIN_HEADER_LENGTH) {
660     BLOSC_TRACE_ERROR("`cbytes` is too small to read min header.");
661     return BLOSC2_ERROR_INVALID_HEADER;
662   }
663   if (header->blocksize <= 0 || (header->nbytes > 0 && (header->blocksize > header->nbytes))) {
664     BLOSC_TRACE_ERROR("`blocksize` is zero or greater than uncompressed size");
665     return BLOSC2_ERROR_INVALID_HEADER;
666   }
667   if (header->blocksize > BLOSC2_MAXBLOCKSIZE) {
668     BLOSC_TRACE_ERROR("`blocksize` greater than maximum allowed");
669     return BLOSC2_ERROR_INVALID_HEADER;
670   }
671   if (header->typesize <= 0 || header->typesize > BLOSC_MAX_TYPESIZE) {
672     BLOSC_TRACE_ERROR("`typesize` is zero or greater than max allowed.");
673     return BLOSC2_ERROR_INVALID_HEADER;
674   }
675 
676   /* Read extended header if it is wanted */
677   if ((extended_header) && (header->flags & BLOSC_DOSHUFFLE) && (header->flags & BLOSC_DOBITSHUFFLE)) {
678     if (header->cbytes < BLOSC_EXTENDED_HEADER_LENGTH) {
679       BLOSC_TRACE_ERROR("`cbytes` is too small to read extended header.");
680       return BLOSC2_ERROR_INVALID_HEADER;
681     }
682     if (srcsize < BLOSC_EXTENDED_HEADER_LENGTH) {
683       BLOSC_TRACE_ERROR("Not enough space to read Blosc extended header.");
684       return BLOSC2_ERROR_READ_BUFFER;
685     }
686 
687     memcpy((uint8_t *)header + BLOSC_MIN_HEADER_LENGTH, src + BLOSC_MIN_HEADER_LENGTH,
688       BLOSC_EXTENDED_HEADER_LENGTH - BLOSC_MIN_HEADER_LENGTH);
689 
690     int32_t special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
691     if (special_type != 0) {
692       if (header->nbytes % header->typesize != 0) {
693         BLOSC_TRACE_ERROR("`nbytes` is not a multiple of typesize");
694         return BLOSC2_ERROR_INVALID_HEADER;
695       }
696       if (special_type == BLOSC2_SPECIAL_VALUE) {
697         if (header->cbytes < BLOSC_EXTENDED_HEADER_LENGTH + header->typesize) {
698           BLOSC_TRACE_ERROR("`cbytes` is too small for run length encoding");
699           return BLOSC2_ERROR_READ_BUFFER;
700         }
701       }
702     }
703     // The number of filters depends on the version of the header. Blosc2 alpha series
704     // did not initialize filters to zero beyond the max supported.
705     if (header->version == BLOSC2_VERSION_FORMAT_ALPHA) {
706       header->filter_codes[5] = 0;
707       header->filter_meta[5] = 0;
708     }
709   }
710   else {
711     flags_to_filters(header->flags, header->filter_codes);
712   }
713   return 0;
714 }
715 
blosc2_calculate_blocks(blosc2_context * context)716 static inline void blosc2_calculate_blocks(blosc2_context* context) {
717   /* Compute number of blocks in buffer */
718   context->nblocks = context->sourcesize / context->blocksize;
719   context->leftover = context->sourcesize % context->blocksize;
720   context->nblocks = (context->leftover > 0) ?
721                      (context->nblocks + 1) : context->nblocks;
722 }
723 
blosc2_initialize_context_from_header(blosc2_context * context,blosc_header * header)724 static int blosc2_initialize_context_from_header(blosc2_context* context, blosc_header* header) {
725   context->header_flags = header->flags;
726   context->typesize = header->typesize;
727   context->sourcesize = header->nbytes;
728   context->blocksize = header->blocksize;
729   context->blosc2_flags = header->blosc2_flags;
730   context->compcode = header->flags >> 5;
731   if (context->compcode == BLOSC_UDCODEC_FORMAT) {
732     context->compcode = header->udcompcode;
733   }
734   blosc2_calculate_blocks(context);
735 
736   bool is_lazy = false;
737   if ((context->header_flags & BLOSC_DOSHUFFLE) &&
738       (context->header_flags & BLOSC_DOBITSHUFFLE)) {
739     /* Extended header */
740     context->header_overhead = BLOSC_EXTENDED_HEADER_LENGTH;
741 
742     memcpy(context->filters, header->filter_codes, BLOSC2_MAX_FILTERS);
743     memcpy(context->filters_meta, header->filter_meta, BLOSC2_MAX_FILTERS);
744     context->compcode_meta = header->compcode_meta;
745 
746     context->filter_flags = filters_to_flags(header->filter_codes);
747     context->special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
748 
749     is_lazy = (context->blosc2_flags & 0x08u);
750   }
751   else {
752     context->header_overhead = BLOSC_MIN_HEADER_LENGTH;
753     context->filter_flags = get_filter_flags(context->header_flags, context->typesize);
754     flags_to_filters(context->header_flags, context->filters);
755   }
756 
757   // Some checks for malformed headers
758   if (!is_lazy && header->cbytes > context->srcsize) {
759     return BLOSC2_ERROR_INVALID_HEADER;
760   }
761 
762   return 0;
763 }
764 
765 
blosc2_intialize_header_from_context(blosc2_context * context,blosc_header * header,bool extended_header)766 static int blosc2_intialize_header_from_context(blosc2_context* context, blosc_header* header, bool extended_header) {
767   memset(header, 0, sizeof(blosc_header));
768 
769   header->version = BLOSC_VERSION_FORMAT;
770   header->versionlz = compcode_to_compversion(context->compcode);
771   header->flags = context->header_flags;
772   header->typesize = (uint8_t)context->typesize;
773   header->nbytes = (int32_t)context->sourcesize;
774   header->blocksize = (int32_t)context->blocksize;
775 
776   int little_endian = is_little_endian();
777   if (!little_endian) {
778     header->nbytes = bswap32_(header->nbytes);
779     header->blocksize = bswap32_(header->blocksize);
780     // cbytes written after compression
781   }
782 
783   if (extended_header) {
784     /* Store filter pipeline info at the end of the header */
785     for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
786       header->filter_codes[i] = context->filters[i];
787       header->filter_meta[i] = context->filters_meta[i];
788     }
789     header->udcompcode = context->compcode;
790     header->compcode_meta = context->compcode_meta;
791 
792     if (!little_endian) {
793       header->blosc2_flags |= BLOSC2_BIGENDIAN;
794     }
795     if (context->use_dict) {
796       header->blosc2_flags |= BLOSC2_USEDICT;
797     }
798   }
799 
800   return 0;
801 }
802 
803 
pipeline_forward(struct thread_context * thread_context,const int32_t bsize,const uint8_t * src,const int32_t offset,uint8_t * dest,uint8_t * tmp,uint8_t * tmp2)804 uint8_t* pipeline_forward(struct thread_context* thread_context, const int32_t bsize,
805                           const uint8_t* src, const int32_t offset,
806                           uint8_t* dest, uint8_t* tmp, uint8_t* tmp2) {
807   blosc2_context* context = thread_context->parent_context;
808   uint8_t* _src = (uint8_t*)src + offset;
809   uint8_t* _tmp = tmp;
810   uint8_t* _dest = dest;
811   int32_t typesize = context->typesize;
812   uint8_t* filters = context->filters;
813   uint8_t* filters_meta = context->filters_meta;
814   bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
815 
816   /* Prefilter function */
817   if (context->prefilter != NULL) {
818     // Create new prefilter parameters for this block (must be private for each thread)
819     blosc2_prefilter_params preparams;
820     memcpy(&preparams, context->preparams, sizeof(preparams));
821     preparams.in = _src;
822     preparams.out = _dest;
823     preparams.out_size = (size_t)bsize;
824     preparams.out_typesize = typesize;
825     preparams.out_offset = offset;
826     preparams.tid = thread_context->tid;
827     preparams.ttmp = thread_context->tmp;
828     preparams.ttmp_nbytes = thread_context->tmp_nbytes;
829     preparams.ctx = context;
830 
831     if (context->prefilter(&preparams) != 0) {
832       BLOSC_TRACE_ERROR("Execution of prefilter function failed");
833       return NULL;
834     }
835 
836     if (memcpyed) {
837       // No more filters are required
838       return _dest;
839     }
840     // Cycle buffers
841     _src = _dest;
842     _dest = _tmp;
843     _tmp = _src;
844   }
845 
846   /* Process the filter pipeline */
847   for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
848     int rc = BLOSC2_ERROR_SUCCESS;
849     if (filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
850       switch (filters[i]) {
851         case BLOSC_SHUFFLE:
852           for (int j = 0; j <= filters_meta[i]; j++) {
853             shuffle(typesize, bsize, _src, _dest);
854             // Cycle filters when required
855             if (j < filters_meta[i]) {
856               _src = _dest;
857               _dest = _tmp;
858               _tmp = _src;
859             }
860           }
861           break;
862         case BLOSC_BITSHUFFLE:
863           if (bitshuffle(typesize, bsize, _src, _dest, tmp2) < 0) {
864             return NULL;
865           }
866           break;
867         case BLOSC_DELTA:
868           delta_encoder(src, offset, bsize, typesize, _src, _dest);
869           break;
870         case BLOSC_TRUNC_PREC:
871           truncate_precision(filters_meta[i], typesize, bsize, _src, _dest);
872           break;
873         default:
874           if (filters[i] != BLOSC_NOFILTER) {
875             BLOSC_TRACE_ERROR("Filter %d not handled during compression\n", filters[i]);
876             return NULL;
877           }
878       }
879     }
880     else {
881       // Look for the filters_meta in user filters and run it
882       for (int j = 0; j < g_nfilters; ++j) {
883         if (g_filters[j].id == filters[i]) {
884           if (g_filters[j].forward != NULL) {
885             blosc2_cparams cparams;
886             blosc2_ctx_get_cparams(context, &cparams);
887             rc = g_filters[j].forward(_src, _dest, bsize, filters_meta[i], &cparams);
888           } else {
889             BLOSC_TRACE_ERROR("Forward function is NULL");
890             return NULL;
891           }
892           if (rc != BLOSC2_ERROR_SUCCESS) {
893             BLOSC_TRACE_ERROR("User-defined filter %d failed during compression\n", filters[i]);
894             return NULL;
895           }
896           goto urfiltersuccess;
897         }
898       }
899       BLOSC_TRACE_ERROR("User-defined filter %d not found during compression\n", filters[i]);
900       return NULL;
901 
902       urfiltersuccess:;
903 
904     }
905 
906     // Cycle buffers when required
907     if (filters[i] != BLOSC_NOFILTER) {
908       _src = _dest;
909       _dest = _tmp;
910       _tmp = _src;
911     }
912   }
913   return _src;
914 }
915 
916 
917 // Optimized version for detecting runs.  It compares 8 bytes values wherever possible.
get_run(const uint8_t * ip,const uint8_t * ip_bound)918 static bool get_run(const uint8_t* ip, const uint8_t* ip_bound) {
919   uint8_t x = *ip;
920   int64_t value, value2;
921   /* Broadcast the value for every byte in a 64-bit register */
922   memset(&value, x, 8);
923   while (ip < (ip_bound - 8)) {
924 #if defined(BLOSC_STRICT_ALIGN)
925     memcpy(&value2, ip, 8);
926 #else
927     value2 = *(int64_t*)ip;
928 #endif
929     if (value != value2) {
930       // Values differ.  We don't have a run.
931       return false;
932     }
933     else {
934       ip += 8;
935     }
936   }
937   /* Look into the remainder */
938   while ((ip < ip_bound) && (*ip == x)) ip++;
939   return ip == ip_bound ? true : false;
940 }
941 
942 
943 /* Shuffle & compress a single block */
blosc_c(struct thread_context * thread_context,int32_t bsize,int32_t leftoverblock,int32_t ntbytes,int32_t destsize,const uint8_t * src,const int32_t offset,uint8_t * dest,uint8_t * tmp,uint8_t * tmp2)944 static int blosc_c(struct thread_context* thread_context, int32_t bsize,
945                    int32_t leftoverblock, int32_t ntbytes, int32_t destsize,
946                    const uint8_t* src, const int32_t offset, uint8_t* dest,
947                    uint8_t* tmp, uint8_t* tmp2) {
948   blosc2_context* context = thread_context->parent_context;
949   int dont_split = (context->header_flags & 0x10) >> 4;
950   int dict_training = context->use_dict && context->dict_cdict == NULL;
951   int32_t j, neblock, nstreams;
952   int32_t cbytes;                   /* number of compressed bytes in split */
953   int32_t ctbytes = 0;              /* number of compressed bytes in block */
954   int64_t maxout;
955   int32_t typesize = context->typesize;
956   const char* compname;
957   int accel;
958   const uint8_t* _src;
959   uint8_t *_tmp = tmp, *_tmp2 = tmp2;
960   uint8_t *_tmp3 = thread_context->tmp4;
961   int last_filter_index = last_filter(context->filters, 'c');
962   bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
963 
964   if (last_filter_index >= 0 || context->prefilter != NULL) {
965     /* Apply the filter pipeline just for the prefilter */
966     if (memcpyed && context->prefilter != NULL) {
967       // We only need the prefilter output
968       _src = pipeline_forward(thread_context, bsize, src, offset, dest, _tmp2, _tmp3);
969       if (_src == NULL) {
970         return BLOSC2_ERROR_FILTER_PIPELINE;
971       }
972       return bsize;
973     }
974     /* Apply regular filter pipeline */
975     _src = pipeline_forward(thread_context, bsize, src, offset, _tmp, _tmp2, _tmp3);
976     if (_src == NULL) {
977       return BLOSC2_ERROR_FILTER_PIPELINE;
978     }
979   } else {
980     _src = src + offset;
981   }
982 
983   assert(context->clevel > 0);
984 
985   /* Calculate acceleration for different compressors */
986   accel = get_accel(context);
987 
988   /* The number of compressed data streams for this block */
989   if (!dont_split && !leftoverblock && !dict_training) {
990     nstreams = (int32_t)typesize;
991   }
992   else {
993     nstreams = 1;
994   }
995   neblock = bsize / nstreams;
996   for (j = 0; j < nstreams; j++) {
997     if (!dict_training) {
998       dest += sizeof(int32_t);
999       ntbytes += sizeof(int32_t);
1000       ctbytes += sizeof(int32_t);
1001 
1002       const uint8_t *ip = (uint8_t *) _src + j * neblock;
1003       const uint8_t *ipbound = (uint8_t *) _src + (j + 1) * neblock;
1004 
1005       // See whether we have a run here
1006       if (context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH && get_run(ip, ipbound)) {
1007         // A run
1008         int32_t value = _src[j * neblock];
1009         if (ntbytes > destsize) {
1010           return 0;    /* Non-compressible data */
1011         }
1012         // Encode the repeated byte in the first (LSB) byte of the length of the split.
1013         _sw32(dest - 4, -value);    // write the value in two's complement
1014         if (value > 0) {
1015           // Mark the encoding as a run-length (== 0 is always a 0's run)
1016           ntbytes += 1;
1017           ctbytes += 1;
1018           if (ntbytes > destsize) {
1019             return 0;    /* Non-compressible data */
1020           }
1021           // Set MSB bit (sign) to 1 (not really necessary here, but for demonstration purposes)
1022           // dest[-1] |= 0x80;
1023           dest[0] = 0x1;   // set run-length bit (0) in token
1024           dest += 1;
1025         }
1026         continue;
1027       }
1028     }
1029 
1030     maxout = neblock;
1031     if (ntbytes + maxout > destsize) {
1032       /* avoid buffer * overrun */
1033       maxout = (int64_t)destsize - (int64_t)ntbytes;
1034       if (maxout <= 0) {
1035         return 0;                  /* non-compressible block */
1036       }
1037     }
1038     if (dict_training) {
1039       // We are in the build dict state, so don't compress
1040       // TODO: copy only a percentage for sampling
1041       memcpy(dest, _src + j * neblock, (unsigned int)neblock);
1042       cbytes = (int32_t)neblock;
1043     }
1044     else if (context->compcode == BLOSC_BLOSCLZ) {
1045       cbytes = blosclz_compress(context->clevel, _src + j * neblock,
1046                                 (int)neblock, dest, (int)maxout, context);
1047     }
1048     else if (context->compcode == BLOSC_LZ4) {
1049       void *hash_table = NULL;
1050     #ifdef HAVE_IPP
1051       hash_table = (void*)thread_context->lz4_hash_table;
1052     #endif
1053       cbytes = lz4_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1054                                  (char*)dest, (size_t)maxout, accel, hash_table);
1055     }
1056     else if (context->compcode == BLOSC_LZ4HC) {
1057       cbytes = lz4hc_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1058                                    (char*)dest, (size_t)maxout, context->clevel);
1059     }
1060   #if defined(HAVE_ZLIB)
1061     else if (context->compcode == BLOSC_ZLIB) {
1062       cbytes = zlib_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1063                                   (char*)dest, (size_t)maxout, context->clevel);
1064     }
1065   #endif /* HAVE_ZLIB */
1066   #if defined(HAVE_ZSTD)
1067     else if (context->compcode == BLOSC_ZSTD) {
1068       cbytes = zstd_wrap_compress(thread_context,
1069                                   (char*)_src + j * neblock, (size_t)neblock,
1070                                   (char*)dest, (size_t)maxout, context->clevel);
1071     }
1072   #endif /* HAVE_ZSTD */
1073     else if (context->compcode > BLOSC2_DEFINED_CODECS_STOP) {
1074       for (int i = 0; i < g_ncodecs; ++i) {
1075         if (g_codecs[i].compcode == context->compcode) {
1076           blosc2_cparams cparams;
1077           blosc2_ctx_get_cparams(context, &cparams);
1078           cbytes = g_codecs[i].encoder(_src + j * neblock,
1079                                         neblock,
1080                                         dest,
1081                                         maxout,
1082                                         context->compcode_meta,
1083                                         &cparams);
1084           goto urcodecsuccess;
1085         }
1086       }
1087       BLOSC_TRACE_ERROR("User-defined compressor codec %d not found during compression", context->compcode);
1088       return BLOSC2_ERROR_CODEC_SUPPORT;
1089     urcodecsuccess:
1090       ;
1091     } else {
1092       blosc_compcode_to_compname(context->compcode, &compname);
1093       BLOSC_TRACE_ERROR("Blosc has not been compiled with '%s' compression support."
1094                         "Please use one having it.", compname);
1095       return BLOSC2_ERROR_CODEC_SUPPORT;
1096     }
1097 
1098     if (cbytes > maxout) {
1099       /* Buffer overrun caused by compression (should never happen) */
1100       return BLOSC2_ERROR_WRITE_BUFFER;
1101     }
1102     if (cbytes < 0) {
1103       /* cbytes should never be negative */
1104       return BLOSC2_ERROR_DATA;
1105     }
1106     if (!dict_training) {
1107       if (cbytes == 0 || cbytes == neblock) {
1108         /* The compressor has been unable to compress data at all. */
1109         /* Before doing the copy, check that we are not running into a
1110            buffer overflow. */
1111         if ((ntbytes + neblock) > destsize) {
1112           return 0;    /* Non-compressible data */
1113         }
1114         memcpy(dest, _src + j * neblock, (unsigned int)neblock);
1115         cbytes = neblock;
1116       }
1117       _sw32(dest - 4, cbytes);
1118     }
1119     dest += cbytes;
1120     ntbytes += cbytes;
1121     ctbytes += cbytes;
1122   }  /* Closes j < nstreams */
1123 
1124   //printf("c%d", ctbytes);
1125   return ctbytes;
1126 }
1127 
1128 
1129 /* Process the filter pipeline (decompression mode) */
pipeline_backward(struct thread_context * thread_context,const int32_t bsize,uint8_t * dest,const int32_t offset,uint8_t * src,uint8_t * tmp,uint8_t * tmp2,int last_filter_index,int32_t nblock)1130 int pipeline_backward(struct thread_context* thread_context, const int32_t bsize, uint8_t* dest,
1131                const int32_t offset, uint8_t* src, uint8_t* tmp,
1132                uint8_t* tmp2, int last_filter_index, int32_t nblock) {
1133   blosc2_context* context = thread_context->parent_context;
1134   int32_t typesize = context->typesize;
1135   uint8_t* filters = context->filters;
1136   uint8_t* filters_meta = context->filters_meta;
1137   blosc2_filter * urfilters = context->urfilters;
1138   uint8_t* _src = src;
1139   uint8_t* _dest = tmp;
1140   uint8_t* _tmp = tmp2;
1141   int errcode = 0;
1142 
1143   for (int i = BLOSC2_MAX_FILTERS - 1; i >= 0; i--) {
1144     // Delta filter requires the whole chunk ready
1145     int last_copy_filter = (last_filter_index == i) || (next_filter(filters, i, 'd') == BLOSC_DELTA);
1146     if (last_copy_filter && context->postfilter == NULL) {
1147       _dest = dest + offset;
1148     }
1149     int rc = BLOSC2_ERROR_SUCCESS;
1150     if (filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
1151       switch (filters[i]) {
1152         case BLOSC_SHUFFLE:
1153           for (int j = 0; j <= filters_meta[i]; j++) {
1154             unshuffle(typesize, bsize, _src, _dest);
1155             // Cycle filters when required
1156             if (j < filters_meta[i]) {
1157               _src = _dest;
1158               _dest = _tmp;
1159               _tmp = _src;
1160             }
1161             // Check whether we have to copy the intermediate _dest buffer to final destination
1162             if (last_copy_filter && (filters_meta[i] % 2) == 1 && j == filters_meta[i]) {
1163               memcpy(dest + offset, _dest, (unsigned int) bsize);
1164             }
1165           }
1166           break;
1167         case BLOSC_BITSHUFFLE:
1168           if (bitunshuffle(typesize, bsize, _src, _dest, _tmp, context->src[BLOSC2_CHUNK_VERSION]) < 0) {
1169             return BLOSC2_ERROR_FILTER_PIPELINE;
1170           }
1171           break;
1172         case BLOSC_DELTA:
1173           if (context->nthreads == 1) {
1174             /* Serial mode */
1175             delta_decoder(dest, offset, bsize, typesize, _dest);
1176           } else {
1177             /* Force the thread in charge of the block 0 to go first */
1178             pthread_mutex_lock(&context->delta_mutex);
1179             if (context->dref_not_init) {
1180               if (offset != 0) {
1181                 pthread_cond_wait(&context->delta_cv, &context->delta_mutex);
1182               } else {
1183                 delta_decoder(dest, offset, bsize, typesize, _dest);
1184                 context->dref_not_init = 0;
1185                 pthread_cond_broadcast(&context->delta_cv);
1186               }
1187             }
1188             pthread_mutex_unlock(&context->delta_mutex);
1189             if (offset != 0) {
1190               delta_decoder(dest, offset, bsize, typesize, _dest);
1191             }
1192           }
1193           break;
1194         case BLOSC_TRUNC_PREC:
1195           // TRUNC_PREC filter does not need to be undone
1196           break;
1197         default:
1198           if (filters[i] != BLOSC_NOFILTER) {
1199             BLOSC_TRACE_ERROR("Filter %d not handled during decompression.",
1200                               filters[i]);
1201 
1202             errcode = -1;
1203           }
1204       }
1205     } else {
1206         // Look for the filters_meta in user filters and run it
1207         for (int j = 0; j < g_nfilters; ++j) {
1208           if (g_filters[j].id == filters[i]) {
1209             if (g_filters[j].backward != NULL) {
1210               blosc2_dparams dparams;
1211               blosc2_ctx_get_dparams(context, &dparams);
1212               rc = g_filters[j].backward(_src, _dest, bsize, filters_meta[i], &dparams);
1213             } else {
1214               BLOSC_TRACE_ERROR("Backward function is NULL");
1215               return BLOSC2_ERROR_FILTER_PIPELINE;
1216             }
1217             if (rc != BLOSC2_ERROR_SUCCESS) {
1218               BLOSC_TRACE_ERROR("User-defined filter %d failed during decompression.", filters[i]);
1219               return rc;
1220             }
1221             goto urfiltersuccess;
1222           }
1223         }
1224       BLOSC_TRACE_ERROR("User-defined filter %d not found during decompression.", filters[i]);
1225       return BLOSC2_ERROR_FILTER_PIPELINE;
1226       urfiltersuccess:;
1227     }
1228 
1229     // Cycle buffers when required
1230     if ((filters[i] != BLOSC_NOFILTER) && (filters[i] != BLOSC_TRUNC_PREC)) {
1231       _src = _dest;
1232       _dest = _tmp;
1233       _tmp = _src;
1234     }
1235     if (last_filter_index == i) {
1236       break;
1237     }
1238   }
1239 
1240   /* Postfilter function */
1241   if (context->postfilter != NULL) {
1242     // Create new postfilter parameters for this block (must be private for each thread)
1243     blosc2_postfilter_params postparams;
1244     memcpy(&postparams, context->postparams, sizeof(postparams));
1245     postparams.in = _src;
1246     postparams.out = dest + offset;
1247     postparams.size = bsize;
1248     postparams.typesize = typesize;
1249     postparams.offset = nblock * context->blocksize;
1250     postparams.tid = thread_context->tid;
1251     postparams.ttmp = thread_context->tmp;
1252     postparams.ttmp_nbytes = thread_context->tmp_nbytes;
1253     postparams.ctx = context;
1254 
1255     if (context->postfilter(&postparams) != 0) {
1256       BLOSC_TRACE_ERROR("Execution of postfilter function failed");
1257       return BLOSC2_ERROR_POSTFILTER;
1258     }
1259   }
1260 
1261   return errcode;
1262 }
1263 
1264 
set_nans(int32_t typesize,uint8_t * dest,int32_t destsize)1265 static int32_t set_nans(int32_t typesize, uint8_t* dest, int32_t destsize) {
1266   // destsize can only be a multiple of typesize
1267   if (destsize % typesize != 0) {
1268     return -1;
1269   }
1270   int32_t nitems = destsize / typesize;
1271   if (nitems == 0) {
1272     return 0;
1273   }
1274 
1275   if (typesize == 4) {
1276     float* dest_ = (float*)dest;
1277     float val = nanf("");
1278     for (int i = 0; i < nitems; i++) {
1279       dest_[i] = val;
1280     }
1281     return nitems;
1282   }
1283   else if (typesize == 8) {
1284     double* dest_ = (double*)dest;
1285     double val = nan("");
1286     for (int i = 0; i < nitems; i++) {
1287       dest_[i] = val;
1288     }
1289     return nitems;
1290   }
1291 
1292   BLOSC_TRACE_ERROR("Unsupported typesize for NaN");
1293   return BLOSC2_ERROR_DATA;
1294 }
1295 
1296 
set_values(int32_t typesize,const uint8_t * src,uint8_t * dest,int32_t destsize)1297 static int32_t set_values(int32_t typesize, const uint8_t* src, uint8_t* dest, int32_t destsize) {
1298   // destsize can only be a multiple of typesize
1299   int64_t val8;
1300   int64_t* dest8;
1301   int32_t val4;
1302   int32_t* dest4;
1303   int16_t val2;
1304   int16_t* dest2;
1305   int8_t val1;
1306   int8_t* dest1;
1307 
1308   if (destsize % typesize != 0) {
1309     return -1;
1310   }
1311   int32_t nitems = destsize / typesize;
1312   if (nitems == 0) {
1313     return 0;
1314   }
1315 
1316   switch (typesize) {
1317     case 8:
1318       val8 = ((int64_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1319       dest8 = (int64_t*)dest;
1320       for (int i = 0; i < nitems; i++) {
1321         dest8[i] = val8;
1322       }
1323       break;
1324     case 4:
1325       val4 = ((int32_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1326       dest4 = (int32_t*)dest;
1327       for (int i = 0; i < nitems; i++) {
1328         dest4[i] = val4;
1329       }
1330       break;
1331     case 2:
1332       val2 = ((int16_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1333       dest2 = (int16_t*)dest;
1334       for (int i = 0; i < nitems; i++) {
1335         dest2[i] = val2;
1336       }
1337       break;
1338     case 1:
1339       val1 = ((int8_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1340       dest1 = (int8_t*)dest;
1341       for (int i = 0; i < nitems; i++) {
1342         dest1[i] = val1;
1343       }
1344       break;
1345     default:
1346       for (int i = 0; i < nitems; i++) {
1347         memcpy(dest + i * typesize, src + BLOSC_EXTENDED_HEADER_LENGTH, typesize);
1348       }
1349   }
1350 
1351   return nitems;
1352 }
1353 
1354 
1355 /* Decompress & unshuffle a single block */
blosc_d(struct thread_context * thread_context,int32_t bsize,int32_t leftoverblock,bool memcpyed,const uint8_t * src,int32_t srcsize,int32_t src_offset,int32_t nblock,uint8_t * dest,int32_t dest_offset,uint8_t * tmp,uint8_t * tmp2)1356 static int blosc_d(
1357     struct thread_context* thread_context, int32_t bsize,
1358     int32_t leftoverblock, bool memcpyed, const uint8_t* src, int32_t srcsize, int32_t src_offset,
1359     int32_t nblock, uint8_t* dest, int32_t dest_offset, uint8_t* tmp, uint8_t* tmp2) {
1360   blosc2_context* context = thread_context->parent_context;
1361   uint8_t* filters = context->filters;
1362   uint8_t *tmp3 = thread_context->tmp4;
1363   int32_t compformat = (context->header_flags & (uint8_t)0xe0) >> 5u;
1364   int dont_split = (context->header_flags & 0x10) >> 4;
1365   int32_t chunk_nbytes;
1366   int32_t chunk_cbytes;
1367   int nstreams;
1368   int32_t neblock;
1369   int32_t nbytes;                /* number of decompressed bytes in split */
1370   int32_t cbytes;                /* number of compressed bytes in split */
1371   int32_t ctbytes = 0;           /* number of compressed bytes in block */
1372   int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
1373   uint8_t* _dest;
1374   int32_t typesize = context->typesize;
1375   const char* compname;
1376   int rc;
1377 
1378   rc = blosc2_cbuffer_sizes(src, &chunk_nbytes, &chunk_cbytes, NULL);
1379   if (rc < 0) {
1380     return rc;
1381   }
1382 
1383   if (context->block_maskout != NULL && context->block_maskout[nblock]) {
1384     // Do not decompress, but act as if we successfully decompressed everything
1385     return bsize;
1386   }
1387 
1388   // In some situations (lazychunks) the context can arrive uninitialized
1389   // (but BITSHUFFLE needs it for accessing the format of the chunk)
1390   if (context->src == NULL) {
1391     context->src = src;
1392   }
1393 
1394   // Chunks with special values cannot be lazy
1395   bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
1396           (context->blosc2_flags & 0x08u) && !context->special_type);
1397   if (is_lazy) {
1398     // The chunk is on disk, so just lazily load the block
1399     if (context->schunk == NULL) {
1400       BLOSC_TRACE_ERROR("Lazy chunk needs an associated super-chunk.");
1401       return BLOSC2_ERROR_INVALID_PARAM;
1402     }
1403     if (context->schunk->frame == NULL) {
1404       BLOSC_TRACE_ERROR("Lazy chunk needs an associated frame.");
1405       return BLOSC2_ERROR_INVALID_PARAM;
1406     }
1407     blosc2_frame_s* frame = (blosc2_frame_s*)context->schunk->frame;
1408     char* urlpath = frame->urlpath;
1409     int32_t trailer_len = sizeof(int32_t) + sizeof(int64_t) + context->nblocks * sizeof(int32_t);
1410     size_t trailer_offset = BLOSC_EXTENDED_HEADER_LENGTH + context->nblocks * sizeof(int32_t);
1411     int32_t nchunk;
1412     int64_t chunk_offset;
1413     // The nchunk and the offset of the current chunk are in the trailer
1414     nchunk = *(int32_t*)(src + trailer_offset);
1415     chunk_offset = *(int64_t*)(src + trailer_offset + sizeof(int32_t));
1416     // Get the csize of the nblock
1417     int32_t *block_csizes = (int32_t *)(src + trailer_offset + sizeof(int32_t) + sizeof(int64_t));
1418     int32_t block_csize = block_csizes[nblock];
1419     // Read the lazy block on disk
1420     void* fp = NULL;
1421     blosc2_io_cb *io_cb = blosc2_get_io_cb(context->schunk->storage->io->id);
1422     if (io_cb == NULL) {
1423       BLOSC_TRACE_ERROR("Error getting the input/output API");
1424       return BLOSC2_ERROR_PLUGIN_IO;
1425     }
1426 
1427     if (frame->sframe) {
1428       // The chunk is not in the frame
1429       char* chunkpath = malloc(strlen(frame->urlpath) + 1 + 8 + strlen(".chunk") + 1);
1430       BLOSC_ERROR_NULL(chunkpath, BLOSC2_ERROR_MEMORY_ALLOC);
1431       sprintf(chunkpath, "%s/%08X.chunk", frame->urlpath, nchunk);
1432       fp = io_cb->open(chunkpath, "rb", context->schunk->storage->io->params);
1433       free(chunkpath);
1434       // The offset of the block is src_offset
1435       io_cb->seek(fp, src_offset, SEEK_SET);
1436     }
1437     else {
1438       fp = io_cb->open(urlpath, "rb", context->schunk->storage->io->params);
1439       // The offset of the block is src_offset
1440       io_cb->seek(fp, chunk_offset + src_offset, SEEK_SET);
1441     }
1442     // We can make use of tmp3 because it will be used after src is not needed anymore
1443     int64_t rbytes = io_cb->read(tmp3, 1, block_csize, fp);
1444     io_cb->close(fp);
1445     if ((int32_t)rbytes != block_csize) {
1446       BLOSC_TRACE_ERROR("Cannot read the (lazy) block out of the fileframe.");
1447       return BLOSC2_ERROR_READ_BUFFER;
1448     }
1449     src = tmp3;
1450     src_offset = 0;
1451     srcsize = block_csize;
1452   }
1453 
1454   // If the chunk is memcpyed, we just have to copy the block to dest and return
1455   if (memcpyed) {
1456     int bsize_ = leftoverblock ? chunk_nbytes % context->blocksize : bsize;
1457     if (!context->special_type) {
1458       if (chunk_nbytes + context->header_overhead != chunk_cbytes) {
1459         return BLOSC2_ERROR_WRITE_BUFFER;
1460       }
1461       if (chunk_cbytes < context->header_overhead + (nblock * context->blocksize) + bsize_) {
1462         /* Not enough input to copy block */
1463         return BLOSC2_ERROR_READ_BUFFER;
1464       }
1465     }
1466     if (!is_lazy) {
1467       src += context->header_overhead + nblock * context->blocksize;
1468     }
1469     _dest = dest + dest_offset;
1470     if (context->postfilter != NULL) {
1471       // We are making use of a postfilter, so use a temp for destination
1472       _dest = tmp;
1473     }
1474     rc = 0;
1475     switch (context->special_type) {
1476       case BLOSC2_SPECIAL_VALUE:
1477         // All repeated values
1478         rc = set_values(context->typesize, context->src, _dest, bsize_);
1479         if (rc < 0) {
1480           BLOSC_TRACE_ERROR("set_values failed");
1481           return BLOSC2_ERROR_DATA;
1482         }
1483         break;
1484       case BLOSC2_SPECIAL_NAN:
1485         rc = set_nans(context->typesize, _dest, bsize_);
1486         if (rc < 0) {
1487           BLOSC_TRACE_ERROR("set_nans failed");
1488           return BLOSC2_ERROR_DATA;
1489         }
1490         break;
1491       case BLOSC2_SPECIAL_ZERO:
1492         memset(_dest, 0, bsize_);
1493         break;
1494       case BLOSC2_SPECIAL_UNINIT:
1495         // We do nothing here
1496         break;
1497       default:
1498         memcpy(_dest, src, bsize_);
1499     }
1500     if (context->postfilter != NULL) {
1501       // Create new postfilter parameters for this block (must be private for each thread)
1502       blosc2_postfilter_params postparams;
1503       memcpy(&postparams, context->postparams, sizeof(postparams));
1504       postparams.in = tmp;
1505       postparams.out = dest + dest_offset;
1506       postparams.size = bsize;
1507       postparams.typesize = typesize;
1508       postparams.offset = nblock * context->blocksize;
1509       postparams.tid = thread_context->tid;
1510       postparams.ttmp = thread_context->tmp;
1511       postparams.ttmp_nbytes = thread_context->tmp_nbytes;
1512       postparams.ctx = context;
1513 
1514       // Execute the postfilter (the processed block will be copied to dest)
1515       if (context->postfilter(&postparams) != 0) {
1516         BLOSC_TRACE_ERROR("Execution of postfilter function failed");
1517         return BLOSC2_ERROR_POSTFILTER;
1518       }
1519     }
1520     return bsize_;
1521   }
1522 
1523   if (!is_lazy && (src_offset <= 0 || src_offset >= srcsize)) {
1524     /* Invalid block src offset encountered */
1525     return BLOSC2_ERROR_DATA;
1526   }
1527 
1528   src += src_offset;
1529   srcsize -= src_offset;
1530 
1531   int last_filter_index = last_filter(filters, 'd');
1532 
1533   if (((last_filter_index >= 0) &&
1534       (next_filter(filters, BLOSC2_MAX_FILTERS, 'd') != BLOSC_DELTA)) ||
1535       context->postfilter != NULL) {
1536    // We are making use of some filter, so use a temp for destination
1537    _dest = tmp;
1538   } else {
1539     // If no filters, or only DELTA in pipeline
1540    _dest = dest + dest_offset;
1541   }
1542 
1543   /* The number of compressed data streams for this block */
1544   if (!dont_split && !leftoverblock && !context->use_dict) {
1545     // We don't want to split when in a training dict state
1546     nstreams = (int32_t)typesize;
1547   }
1548   else {
1549     nstreams = 1;
1550   }
1551 
1552   neblock = bsize / nstreams;
1553   if (neblock == 0) {
1554     /* Not enough space to output bytes */
1555     return -1;
1556   }
1557   for (int j = 0; j < nstreams; j++) {
1558     if (srcsize < (signed)sizeof(int32_t)) {
1559       /* Not enough input to read compressed size */
1560       return BLOSC2_ERROR_READ_BUFFER;
1561     }
1562     srcsize -= sizeof(int32_t);
1563     cbytes = sw32_(src);      /* amount of compressed bytes */
1564     if (cbytes > 0) {
1565       if (srcsize < cbytes) {
1566         /* Not enough input to read compressed bytes */
1567         return BLOSC2_ERROR_READ_BUFFER;
1568       }
1569       srcsize -= cbytes;
1570     }
1571     src += sizeof(int32_t);
1572     ctbytes += (signed)sizeof(int32_t);
1573 
1574     /* Uncompress */
1575     if (cbytes == 0) {
1576       // A run of 0's
1577       memset(_dest, 0, (unsigned int)neblock);
1578       nbytes = neblock;
1579     }
1580     else if (cbytes < 0) {
1581       // A negative number means some encoding depending on the token that comes next
1582       uint8_t token;
1583 
1584       if (srcsize < (signed)sizeof(uint8_t)) {
1585         // Not enough input to read token */
1586         return BLOSC2_ERROR_READ_BUFFER;
1587       }
1588       srcsize -= sizeof(uint8_t);
1589 
1590       token = src[0];
1591       src += 1;
1592       ctbytes += 1;
1593 
1594       if (token & 0x1) {
1595         // A run of bytes that are different than 0
1596         if (cbytes < -255) {
1597           // Runs can only encode a byte
1598           return BLOSC2_ERROR_RUN_LENGTH;
1599         }
1600         uint8_t value = -cbytes;
1601         memset(_dest, value, (unsigned int)neblock);
1602       } else {
1603         BLOSC_TRACE_ERROR("Invalid or unsupported compressed stream token value - %d", token);
1604         return BLOSC2_ERROR_RUN_LENGTH;
1605       }
1606       nbytes = neblock;
1607       cbytes = 0;  // everything is encoded in the cbytes token
1608     }
1609     else if (cbytes == neblock) {
1610       memcpy(_dest, src, (unsigned int)neblock);
1611       nbytes = (int32_t)neblock;
1612     }
1613     else {
1614       if (compformat == BLOSC_BLOSCLZ_FORMAT) {
1615         nbytes = blosclz_decompress(src, cbytes, _dest, (int)neblock);
1616       }
1617       else if (compformat == BLOSC_LZ4_FORMAT) {
1618         nbytes = lz4_wrap_decompress((char*)src, (size_t)cbytes,
1619                                      (char*)_dest, (size_t)neblock);
1620       }
1621   #if defined(HAVE_ZLIB)
1622       else if (compformat == BLOSC_ZLIB_FORMAT) {
1623         nbytes = zlib_wrap_decompress((char*)src, (size_t)cbytes,
1624                                       (char*)_dest, (size_t)neblock);
1625       }
1626   #endif /*  HAVE_ZLIB */
1627   #if defined(HAVE_ZSTD)
1628       else if (compformat == BLOSC_ZSTD_FORMAT) {
1629         nbytes = zstd_wrap_decompress(thread_context,
1630                                       (char*)src, (size_t)cbytes,
1631                                       (char*)_dest, (size_t)neblock);
1632       }
1633   #endif /*  HAVE_ZSTD */
1634       else if (compformat == BLOSC_UDCODEC_FORMAT) {
1635         for (int i = 0; i < g_ncodecs; ++i) {
1636           if (g_codecs[i].compcode == context->compcode) {
1637             blosc2_dparams dparams;
1638             blosc2_ctx_get_dparams(context, &dparams);
1639             nbytes = g_codecs[i].decoder(src,
1640                                           cbytes,
1641                                           _dest,
1642                                           neblock,
1643                                           context->compcode_meta,
1644                                           &dparams);
1645             goto urcodecsuccess;
1646           }
1647         }
1648         BLOSC_TRACE_ERROR("User-defined compressor codec %d not found during decompression", context->compcode);
1649         return BLOSC2_ERROR_CODEC_SUPPORT;
1650       urcodecsuccess:
1651         ;
1652       }
1653       else {
1654         compname = clibcode_to_clibname(compformat);
1655         BLOSC_TRACE_ERROR(
1656                 "Blosc has not been compiled with decompression "
1657                 "support for '%s' format.  "
1658                 "Please recompile for adding this support.", compname);
1659         return BLOSC2_ERROR_CODEC_SUPPORT;
1660       }
1661 
1662       /* Check that decompressed bytes number is correct */
1663       if (nbytes != neblock) {
1664         return BLOSC2_ERROR_DATA;
1665       }
1666 
1667     }
1668     src += cbytes;
1669     ctbytes += cbytes;
1670     _dest += nbytes;
1671     ntbytes += nbytes;
1672   } /* Closes j < nstreams */
1673 
1674   if (last_filter_index >= 0 || context->postfilter != NULL) {
1675     /* Apply regular filter pipeline */
1676     int errcode = pipeline_backward(thread_context, bsize, dest, dest_offset, tmp, tmp2, tmp3,
1677                              last_filter_index, nblock);
1678     if (errcode < 0)
1679       return errcode;
1680   }
1681 
1682   /* Return the number of uncompressed bytes */
1683   return (int)ntbytes;
1684 }
1685 
1686 
1687 /* Serial version for compression/decompression */
serial_blosc(struct thread_context * thread_context)1688 static int serial_blosc(struct thread_context* thread_context) {
1689   blosc2_context* context = thread_context->parent_context;
1690   int32_t j, bsize, leftoverblock;
1691   int32_t cbytes;
1692   int32_t ntbytes = (int32_t)context->output_bytes;
1693   int32_t* bstarts = context->bstarts;
1694   uint8_t* tmp = thread_context->tmp;
1695   uint8_t* tmp2 = thread_context->tmp2;
1696   int dict_training = context->use_dict && (context->dict_cdict == NULL);
1697   bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
1698   if (!context->do_compress && context->special_type) {
1699     // Fake a runlen as if its a memcpyed chunk
1700     memcpyed = true;
1701   }
1702 
1703   for (j = 0; j < context->nblocks; j++) {
1704     if (context->do_compress && !memcpyed && !dict_training) {
1705       _sw32(bstarts + j, ntbytes);
1706     }
1707     bsize = context->blocksize;
1708     leftoverblock = 0;
1709     if ((j == context->nblocks - 1) && (context->leftover > 0)) {
1710       bsize = context->leftover;
1711       leftoverblock = 1;
1712     }
1713     if (context->do_compress) {
1714       if (memcpyed && !context->prefilter) {
1715         /* We want to memcpy only */
1716         memcpy(context->dest + context->header_overhead + j * context->blocksize,
1717                context->src + j * context->blocksize, (unsigned int)bsize);
1718         cbytes = (int32_t)bsize;
1719       }
1720       else {
1721         /* Regular compression */
1722         cbytes = blosc_c(thread_context, bsize, leftoverblock, ntbytes,
1723                          context->destsize, context->src, j * context->blocksize,
1724                          context->dest + ntbytes, tmp, tmp2);
1725         if (cbytes == 0) {
1726           ntbytes = 0;              /* uncompressible data */
1727           break;
1728         }
1729       }
1730     }
1731     else {
1732       /* Regular decompression */
1733       // If memcpyed we don't have a bstarts section (because it is not needed)
1734       int32_t src_offset = memcpyed ?
1735           context->header_overhead + j * context->blocksize : sw32_(bstarts + j);
1736       cbytes = blosc_d(thread_context, bsize, leftoverblock, memcpyed,
1737                        context->src, context->srcsize, src_offset, j,
1738                        context->dest, j * context->blocksize, tmp, tmp2);
1739     }
1740 
1741     if (cbytes < 0) {
1742       ntbytes = cbytes;         /* error in blosc_c or blosc_d */
1743       break;
1744     }
1745     ntbytes += cbytes;
1746   }
1747 
1748   return ntbytes;
1749 }
1750 
1751 static void t_blosc_do_job(void *ctxt);
1752 
1753 /* Threaded version for compression/decompression */
parallel_blosc(blosc2_context * context)1754 static int parallel_blosc(blosc2_context* context) {
1755 #ifdef BLOSC_POSIX_BARRIERS
1756   int rc;
1757 #endif
1758   /* Set sentinels */
1759   context->thread_giveup_code = 1;
1760   context->thread_nblock = -1;
1761 
1762   if (threads_callback) {
1763     threads_callback(threads_callback_data, t_blosc_do_job,
1764                      context->nthreads, sizeof(struct thread_context), (void*) context->thread_contexts);
1765   }
1766   else {
1767     /* Synchronization point for all threads (wait for initialization) */
1768     WAIT_INIT(-1, context);
1769 
1770     /* Synchronization point for all threads (wait for finalization) */
1771     WAIT_FINISH(-1, context);
1772   }
1773 
1774   if (context->thread_giveup_code <= 0) {
1775     /* Compression/decompression gave up.  Return error code. */
1776     return context->thread_giveup_code;
1777   }
1778 
1779   /* Return the total bytes (de-)compressed in threads */
1780   return (int)context->output_bytes;
1781 }
1782 
1783 /* initialize a thread_context that has already been allocated */
init_thread_context(struct thread_context * thread_context,blosc2_context * context,int32_t tid)1784 static int init_thread_context(struct thread_context* thread_context, blosc2_context* context, int32_t tid)
1785 {
1786   int32_t ebsize;
1787 
1788   thread_context->parent_context = context;
1789   thread_context->tid = tid;
1790 
1791   ebsize = context->blocksize + context->typesize * (signed)sizeof(int32_t);
1792   thread_context->tmp_nbytes = (size_t)4 * ebsize;
1793   thread_context->tmp = my_malloc(thread_context->tmp_nbytes);
1794   BLOSC_ERROR_NULL(thread_context->tmp, BLOSC2_ERROR_MEMORY_ALLOC);
1795   thread_context->tmp2 = thread_context->tmp + ebsize;
1796   thread_context->tmp3 = thread_context->tmp2 + ebsize;
1797   thread_context->tmp4 = thread_context->tmp3 + ebsize;
1798   thread_context->tmp_blocksize = context->blocksize;
1799   #if defined(HAVE_ZSTD)
1800   thread_context->zstd_cctx = NULL;
1801   thread_context->zstd_dctx = NULL;
1802   #endif
1803 
1804   /* Create the hash table for LZ4 in case we are using IPP */
1805 #ifdef HAVE_IPP
1806   IppStatus status;
1807   int inlen = thread_context->tmp_blocksize > 0 ? thread_context->tmp_blocksize : 1 << 16;
1808   int hash_size = 0;
1809   status = ippsEncodeLZ4HashTableGetSize_8u(&hash_size);
1810   if (status != ippStsNoErr) {
1811       BLOSC_TRACE_ERROR("Error in ippsEncodeLZ4HashTableGetSize_8u.");
1812   }
1813   Ipp8u *hash_table = ippsMalloc_8u(hash_size);
1814   status = ippsEncodeLZ4HashTableInit_8u(hash_table, inlen);
1815   if (status != ippStsNoErr) {
1816     BLOSC_TRACE_ERROR("Error in ippsEncodeLZ4HashTableInit_8u.");
1817   }
1818   thread_context->lz4_hash_table = hash_table;
1819 #endif
1820   return 0;
1821 }
1822 
1823 static struct thread_context*
create_thread_context(blosc2_context * context,int32_t tid)1824 create_thread_context(blosc2_context* context, int32_t tid) {
1825   struct thread_context* thread_context;
1826   thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
1827   BLOSC_ERROR_NULL(thread_context, NULL);
1828   int rc = init_thread_context(thread_context, context, tid);
1829   if (rc < 0) {
1830     return NULL;
1831   }
1832   return thread_context;
1833 }
1834 
1835 /* free members of thread_context, but not thread_context itself */
destroy_thread_context(struct thread_context * thread_context)1836 static void destroy_thread_context(struct thread_context* thread_context) {
1837   my_free(thread_context->tmp);
1838 #if defined(HAVE_ZSTD)
1839   if (thread_context->zstd_cctx != NULL) {
1840     ZSTD_freeCCtx(thread_context->zstd_cctx);
1841   }
1842   if (thread_context->zstd_dctx != NULL) {
1843     ZSTD_freeDCtx(thread_context->zstd_dctx);
1844   }
1845 #endif
1846 #ifdef HAVE_IPP
1847   if (thread_context->lz4_hash_table != NULL) {
1848     ippsFree(thread_context->lz4_hash_table);
1849   }
1850 #endif
1851 }
1852 
free_thread_context(struct thread_context * thread_context)1853 void free_thread_context(struct thread_context* thread_context) {
1854   destroy_thread_context(thread_context);
1855   my_free(thread_context);
1856 }
1857 
1858 
check_nthreads(blosc2_context * context)1859 int check_nthreads(blosc2_context* context) {
1860   if (context->nthreads <= 0) {
1861     BLOSC_TRACE_ERROR("nthreads must be a positive integer.");
1862     return BLOSC2_ERROR_INVALID_PARAM;
1863   }
1864 
1865   if (context->new_nthreads != context->nthreads) {
1866     if (context->nthreads > 1) {
1867       release_threadpool(context);
1868     }
1869     context->nthreads = context->new_nthreads;
1870   }
1871   if (context->new_nthreads > 1 && context->threads_started == 0) {
1872     init_threadpool(context);
1873   }
1874 
1875   return context->nthreads;
1876 }
1877 
1878 /* Do the compression or decompression of the buffer depending on the
1879    global params. */
do_job(blosc2_context * context)1880 static int do_job(blosc2_context* context) {
1881   int32_t ntbytes;
1882 
1883   /* Set sentinels */
1884   context->dref_not_init = 1;
1885 
1886   /* Check whether we need to restart threads */
1887   check_nthreads(context);
1888 
1889   /* Run the serial version when nthreads is 1 or when the buffers are
1890      not larger than blocksize */
1891   if (context->nthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
1892     /* The context for this 'thread' has no been initialized yet */
1893     if (context->serial_context == NULL) {
1894       context->serial_context = create_thread_context(context, 0);
1895     }
1896     else if (context->blocksize != context->serial_context->tmp_blocksize) {
1897       free_thread_context(context->serial_context);
1898       context->serial_context = create_thread_context(context, 0);
1899     }
1900     BLOSC_ERROR_NULL(context->serial_context, BLOSC2_ERROR_THREAD_CREATE);
1901     ntbytes = serial_blosc(context->serial_context);
1902   }
1903   else {
1904     ntbytes = parallel_blosc(context);
1905   }
1906 
1907   return ntbytes;
1908 }
1909 
1910 
initialize_context_compression(blosc2_context * context,const void * src,int32_t srcsize,void * dest,int32_t destsize,int clevel,uint8_t const * filters,uint8_t const * filters_meta,int32_t typesize,int compressor,int32_t blocksize,int16_t new_nthreads,int16_t nthreads,blosc2_btune * udbtune,void * btune_config,blosc2_schunk * schunk)1911 static int initialize_context_compression(
1912     blosc2_context* context, const void* src, int32_t srcsize, void* dest,
1913     int32_t destsize, int clevel, uint8_t const *filters,
1914     uint8_t const *filters_meta, int32_t typesize, int compressor,
1915     int32_t blocksize, int16_t new_nthreads, int16_t nthreads,
1916     blosc2_btune *udbtune, void *btune_config,
1917     blosc2_schunk* schunk) {
1918 
1919   /* Set parameters */
1920   context->do_compress = 1;
1921   context->src = (const uint8_t*)src;
1922   context->srcsize = srcsize;
1923   context->dest = (uint8_t*)dest;
1924   context->output_bytes = 0;
1925   context->destsize = destsize;
1926   context->sourcesize = srcsize;
1927   context->typesize = (int32_t)typesize;
1928   context->filter_flags = filters_to_flags(filters);
1929   for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
1930     context->filters[i] = filters[i];
1931     context->filters_meta[i] = filters_meta[i];
1932   }
1933   context->compcode = compressor;
1934   context->nthreads = nthreads;
1935   context->new_nthreads = new_nthreads;
1936   context->end_threads = 0;
1937   context->clevel = clevel;
1938   context->schunk = schunk;
1939   context->btune = btune_config;
1940   context->udbtune = udbtune;
1941   /* Tune some compression parameters */
1942   context->blocksize = (int32_t)blocksize;
1943   if (context->btune != NULL) {
1944     context->udbtune->btune_next_cparams(context);
1945   } else {
1946     context->udbtune->btune_next_blocksize(context);
1947   }
1948 
1949   char* envvar = getenv("BLOSC_WARN");
1950   int warnlvl = 0;
1951   if (envvar != NULL) {
1952     warnlvl = strtol(envvar, NULL, 10);
1953   }
1954 
1955   /* Check buffer size limits */
1956   if (srcsize > BLOSC_MAX_BUFFERSIZE) {
1957     if (warnlvl > 0) {
1958       BLOSC_TRACE_ERROR("Input buffer size cannot exceed %d bytes.",
1959                         BLOSC_MAX_BUFFERSIZE);
1960     }
1961     return 0;
1962   }
1963 
1964   if (destsize < BLOSC_MAX_OVERHEAD) {
1965     if (warnlvl > 0) {
1966       BLOSC_TRACE_ERROR("Output buffer size should be larger than %d bytes.",
1967                         BLOSC_MAX_OVERHEAD);
1968     }
1969     return 0;
1970   }
1971 
1972   /* Compression level */
1973   if (clevel < 0 || clevel > 9) {
1974     /* If clevel not in 0..9, print an error */
1975     BLOSC_TRACE_ERROR("`clevel` parameter must be between 0 and 9!.");
1976     return BLOSC2_ERROR_CODEC_PARAM;
1977   }
1978 
1979   /* Check typesize limits */
1980   if (context->typesize > BLOSC_MAX_TYPESIZE) {
1981     /* If typesize is too large, treat buffer as an 1-byte stream. */
1982     context->typesize = 1;
1983   }
1984 
1985   blosc2_calculate_blocks(context);
1986 
1987   return 1;
1988 }
1989 
1990 
initialize_context_decompression(blosc2_context * context,blosc_header * header,const void * src,int32_t srcsize,void * dest,int32_t destsize)1991 static int initialize_context_decompression(blosc2_context* context, blosc_header* header, const void* src,
1992                                             int32_t srcsize, void* dest, int32_t destsize) {
1993   int32_t bstarts_end;
1994 
1995   context->do_compress = 0;
1996   context->src = (const uint8_t*)src;
1997   context->srcsize = srcsize;
1998   context->dest = (uint8_t*)dest;
1999   context->destsize = destsize;
2000   context->output_bytes = 0;
2001   context->end_threads = 0;
2002 
2003   int rc = blosc2_initialize_context_from_header(context, header);
2004   if (rc < 0) {
2005     return rc;
2006   }
2007 
2008   /* Check that we have enough space to decompress */
2009   if (context->sourcesize > (int32_t)context->destsize) {
2010     return BLOSC2_ERROR_WRITE_BUFFER;
2011   }
2012 
2013   if (context->block_maskout != NULL && context->block_maskout_nitems != context->nblocks) {
2014     BLOSC_TRACE_ERROR("The number of items in block_maskout (%d) must match the number"
2015                       " of blocks in chunk (%d).",
2016                       context->block_maskout_nitems, context->nblocks);
2017     return BLOSC2_ERROR_DATA;
2018   }
2019 
2020   context->special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
2021   if (context->special_type > BLOSC2_SPECIAL_LASTID) {
2022     BLOSC_TRACE_ERROR("Unknown special values ID (%d) ",
2023                       context->special_type);
2024     return BLOSC2_ERROR_DATA;
2025   }
2026 
2027   int memcpyed = (context->header_flags & (uint8_t) BLOSC_MEMCPYED);
2028   if (memcpyed && (header->cbytes != header->nbytes + context->header_overhead)) {
2029     BLOSC_TRACE_ERROR("Wrong header info for this memcpyed chunk");
2030     return BLOSC2_ERROR_DATA;
2031   }
2032 
2033   if ((header->nbytes == 0) && (header->cbytes == context->header_overhead) &&
2034       !context->special_type) {
2035     // A compressed buffer with only a header can only contain a zero-length buffer
2036     return 0;
2037   }
2038 
2039   context->bstarts = (int32_t *) (context->src + context->header_overhead);
2040   bstarts_end = context->header_overhead;
2041   if (!context->special_type && !memcpyed) {
2042     /* If chunk is not special or a memcpyed, we do have a bstarts section */
2043     bstarts_end = context->header_overhead + (context->nblocks * sizeof(int32_t));
2044   }
2045 
2046   if (srcsize < bstarts_end) {
2047     BLOSC_TRACE_ERROR("`bstarts` exceeds length of source buffer.");
2048     return BLOSC2_ERROR_READ_BUFFER;
2049   }
2050   srcsize -= bstarts_end;
2051 
2052   /* Read optional dictionary if flag set */
2053   if (context->blosc2_flags & BLOSC2_USEDICT) {
2054 #if defined(HAVE_ZSTD)
2055     context->use_dict = 1;
2056     if (context->dict_ddict != NULL) {
2057       // Free the existing dictionary (probably from another chunk)
2058       ZSTD_freeDDict(context->dict_ddict);
2059     }
2060     // The trained dictionary is after the bstarts block
2061     if (srcsize < (signed)sizeof(int32_t)) {
2062       BLOSC_TRACE_ERROR("Not enough space to read size of dictionary.");
2063       return BLOSC2_ERROR_READ_BUFFER;
2064     }
2065     srcsize -= sizeof(int32_t);
2066     // Read dictionary size
2067     context->dict_size = (size_t)sw32_(context->src + bstarts_end);
2068     if (context->dict_size <= 0 || context->dict_size > BLOSC2_MAXDICTSIZE) {
2069       BLOSC_TRACE_ERROR("Dictionary size is smaller than minimum or larger than maximum allowed.");
2070       return BLOSC2_ERROR_CODEC_DICT;
2071     }
2072     if (srcsize < (int32_t)context->dict_size) {
2073       BLOSC_TRACE_ERROR("Not enough space to read entire dictionary.");
2074       return BLOSC2_ERROR_READ_BUFFER;
2075     }
2076     srcsize -= context->dict_size;
2077     // Read dictionary
2078     context->dict_buffer = (void*)(context->src + bstarts_end + sizeof(int32_t));
2079     context->dict_ddict = ZSTD_createDDict(context->dict_buffer, context->dict_size);
2080 #endif   // HAVE_ZSTD
2081   }
2082 
2083   return 0;
2084 }
2085 
write_compression_header(blosc2_context * context,bool extended_header)2086 static int write_compression_header(blosc2_context* context, bool extended_header) {
2087   blosc_header header;
2088   int dont_split;
2089   int dict_training = context->use_dict && (context->dict_cdict == NULL);
2090 
2091   context->header_flags = 0;
2092 
2093   if (context->clevel == 0) {
2094     /* Compression level 0 means buffer to be memcpy'ed */
2095     context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2096   }
2097   if (context->sourcesize < BLOSC_MIN_BUFFERSIZE) {
2098     /* Buffer is too small.  Try memcpy'ing. */
2099     context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2100   }
2101 
2102   bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2103   if (extended_header) {
2104     /* Indicate that we are building an extended header */
2105     context->header_overhead = BLOSC_EXTENDED_HEADER_LENGTH;
2106     context->header_flags |= (BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE);
2107     /* Store filter pipeline info at the end of the header */
2108     if (dict_training || memcpyed) {
2109       context->bstarts = NULL;
2110       context->output_bytes = context->header_overhead;
2111     } else {
2112       context->bstarts = (int32_t*)(context->dest + context->header_overhead);
2113       context->output_bytes = context->header_overhead + sizeof(int32_t) * context->nblocks;
2114     }
2115   } else {
2116     // Regular header
2117     context->header_overhead = BLOSC_MIN_HEADER_LENGTH;
2118     if (memcpyed) {
2119       context->bstarts = NULL;
2120       context->output_bytes = context->header_overhead;
2121     } else {
2122       context->bstarts = (int32_t *) (context->dest + context->header_overhead);
2123       context->output_bytes = context->header_overhead + sizeof(int32_t) * context->nblocks;
2124     }
2125   }
2126 
2127   // when memcpyed bit is set, there is no point in dealing with others
2128   if (!memcpyed) {
2129     if (context->filter_flags & BLOSC_DOSHUFFLE) {
2130       /* Byte-shuffle is active */
2131       context->header_flags |= BLOSC_DOSHUFFLE;
2132     }
2133 
2134     if (context->filter_flags & BLOSC_DOBITSHUFFLE) {
2135       /* Bit-shuffle is active */
2136       context->header_flags |= BLOSC_DOBITSHUFFLE;
2137     }
2138 
2139     if (context->filter_flags & BLOSC_DODELTA) {
2140       /* Delta is active */
2141       context->header_flags |= BLOSC_DODELTA;
2142     }
2143 
2144     dont_split = !split_block(context, context->typesize,
2145                               context->blocksize, extended_header);
2146 
2147     /* dont_split is in bit 4 */
2148     context->header_flags |= dont_split << 4;
2149     /* codec starts at bit 5 */
2150     uint8_t compformat = compcode_to_compformat(context->compcode);
2151     context->header_flags |= compformat << 5;
2152   }
2153 
2154   // Create blosc header and store to dest
2155   blosc2_intialize_header_from_context(context, &header, extended_header);
2156 
2157   memcpy(context->dest, &header, (extended_header) ?
2158     BLOSC_EXTENDED_HEADER_LENGTH : BLOSC_MIN_HEADER_LENGTH);
2159 
2160   return 1;
2161 }
2162 
2163 
blosc_compress_context(blosc2_context * context)2164 int blosc_compress_context(blosc2_context* context) {
2165   int ntbytes = 0;
2166   blosc_timestamp_t last, current;
2167   bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2168 
2169   blosc_set_timestamp(&last);
2170 
2171   if (!memcpyed) {
2172     /* Do the actual compression */
2173     ntbytes = do_job(context);
2174     if (ntbytes < 0) {
2175       return ntbytes;
2176     }
2177     if (ntbytes == 0) {
2178       // Try out with a memcpy later on (last chance for fitting src buffer in dest).
2179       context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2180       memcpyed = true;
2181     }
2182   }
2183 
2184   if (memcpyed) {
2185     if (context->sourcesize + context->header_overhead > context->destsize) {
2186       /* We are exceeding maximum output size */
2187       ntbytes = 0;
2188     }
2189     else {
2190       context->output_bytes = context->header_overhead;
2191       ntbytes = do_job(context);
2192       if (ntbytes < 0) {
2193         return ntbytes;
2194       }
2195       // Success!  update the memcpy bit in header
2196       context->dest[BLOSC2_CHUNK_FLAGS] = context->header_flags;
2197       // and clear the memcpy bit in context (for next reuse)
2198       context->header_flags &= ~(uint8_t)BLOSC_MEMCPYED;
2199     }
2200   }
2201   else {
2202     // Check whether we have a run for the whole chunk
2203     int start_csizes = context->header_overhead + 4 * context->nblocks;
2204     int dont_split = (context->header_flags & 0x10) >> 4;
2205     int nstreams = context->nblocks;
2206     if (!dont_split) {
2207       // When splitting, the number of streams is computed differently
2208       if (context->leftover) {
2209         nstreams = (context->nblocks - 1) * context->typesize + 1;
2210       }
2211       else {
2212         nstreams *= context->typesize;
2213       }
2214     }
2215     if (ntbytes == start_csizes + nstreams * sizeof(int32_t)) {
2216       // The streams are all zero runs (by construction).  Encode it...
2217       context->dest[BLOSC2_CHUNK_BLOSC2_FLAGS] |= BLOSC2_SPECIAL_ZERO << 4;
2218       // ...and assign the new chunk length
2219       ntbytes = context->header_overhead;
2220     }
2221   }
2222 
2223   /* Set the number of compressed bytes in header */
2224   _sw32(context->dest + BLOSC2_CHUNK_CBYTES, ntbytes);
2225 
2226   /* Set the number of bytes in dest buffer (might be useful for btune) */
2227   context->destsize = ntbytes;
2228 
2229   if (context->btune != NULL) {
2230     blosc_set_timestamp(&current);
2231     double ctime = blosc_elapsed_secs(last, current);
2232     context->udbtune->btune_update(context, ctime);
2233   }
2234 
2235   return ntbytes;
2236 }
2237 
2238 
2239 /* The public secure routine for compression with context. */
blosc2_compress_ctx(blosc2_context * context,const void * src,int32_t srcsize,void * dest,int32_t destsize)2240 int blosc2_compress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2241                         void* dest, int32_t destsize) {
2242   int error, cbytes;
2243 
2244   if (context->do_compress != 1) {
2245     BLOSC_TRACE_ERROR("Context is not meant for compression.  Giving up.");
2246     return BLOSC2_ERROR_INVALID_PARAM;
2247   }
2248 
2249   error = initialize_context_compression(
2250     context, src, srcsize, dest, destsize,
2251     context->clevel, context->filters, context->filters_meta,
2252     context->typesize, context->compcode, context->blocksize,
2253     context->new_nthreads, context->nthreads,
2254     context->udbtune, context->btune, context->schunk);
2255   if (error <= 0) {
2256     return error;
2257   }
2258 
2259   /* Write the extended header */
2260   error = write_compression_header(context, true);
2261   if (error < 0) {
2262     return error;
2263   }
2264 
2265   cbytes = blosc_compress_context(context);
2266   if (cbytes < 0) {
2267     return cbytes;
2268   }
2269 
2270   if (context->use_dict && context->dict_cdict == NULL) {
2271 
2272     if (context->compcode != BLOSC_ZSTD) {
2273       const char* compname;
2274       compname = clibcode_to_clibname(context->compcode);
2275       BLOSC_TRACE_ERROR("Codec %s does not support dicts.  Giving up.",
2276                         compname);
2277       return BLOSC2_ERROR_CODEC_DICT;
2278     }
2279 
2280 #ifdef HAVE_ZSTD
2281     // Build the dictionary out of the filters outcome and compress with it
2282     int32_t dict_maxsize = BLOSC2_MAXDICTSIZE;
2283     // Do not make the dict more than 5% larger than uncompressed buffer
2284     if (dict_maxsize > srcsize / 20) {
2285       dict_maxsize = srcsize / 20;
2286     }
2287     void* samples_buffer = context->dest + context->header_overhead;
2288     unsigned nblocks = 8;  // the minimum that accepts zstd as of 1.4.0
2289     unsigned sample_fraction = 1;  // 1 allows to use most of the chunk for training
2290     size_t sample_size = context->sourcesize / nblocks / sample_fraction;
2291 
2292     // Populate the samples sizes for training the dictionary
2293     size_t* samples_sizes = malloc(nblocks * sizeof(void*));
2294     BLOSC_ERROR_NULL(samples_sizes, BLOSC2_ERROR_MEMORY_ALLOC);
2295     for (size_t i = 0; i < nblocks; i++) {
2296       samples_sizes[i] = sample_size;
2297     }
2298 
2299     // Train from samples
2300     void* dict_buffer = malloc(dict_maxsize);
2301     BLOSC_ERROR_NULL(dict_buffer, BLOSC2_ERROR_MEMORY_ALLOC);
2302     int32_t dict_actual_size = (int32_t)ZDICT_trainFromBuffer(dict_buffer, dict_maxsize, samples_buffer, samples_sizes, nblocks);
2303 
2304     // TODO: experiment with parameters of low-level fast cover algorithm
2305     // Note that this API is still unstable.  See: https://github.com/facebook/zstd/issues/1599
2306     // ZDICT_fastCover_params_t fast_cover_params;
2307     // memset(&fast_cover_params, 0, sizeof(fast_cover_params));
2308     // fast_cover_params.d = nblocks;
2309     // fast_cover_params.steps = 4;
2310     // fast_cover_params.zParams.compressionLevel = context->clevel;
2311     //size_t dict_actual_size = ZDICT_optimizeTrainFromBuffer_fastCover(dict_buffer, dict_maxsize, samples_buffer, samples_sizes, nblocks, &fast_cover_params);
2312 
2313     if (ZDICT_isError(dict_actual_size) != ZSTD_error_no_error) {
2314       BLOSC_TRACE_ERROR("Error in ZDICT_trainFromBuffer(): '%s'."
2315                         "  Giving up.", ZDICT_getErrorName(dict_actual_size));
2316       return BLOSC2_ERROR_CODEC_DICT;
2317     }
2318     assert(dict_actual_size > 0);
2319     free(samples_sizes);
2320 
2321     // Update bytes counter and pointers to bstarts for the new compressed buffer
2322     context->bstarts = (int32_t*)(context->dest + context->header_overhead);
2323     context->output_bytes = context->header_overhead + sizeof(int32_t) * context->nblocks;
2324     /* Write the size of trained dict at the end of bstarts */
2325     _sw32(context->dest + context->output_bytes, (int32_t)dict_actual_size);
2326     context->output_bytes += sizeof(int32_t);
2327     /* Write the trained dict afterwards */
2328     context->dict_buffer = context->dest + context->output_bytes;
2329     memcpy(context->dict_buffer, dict_buffer, (unsigned int)dict_actual_size);
2330     context->dict_cdict = ZSTD_createCDict(dict_buffer, dict_actual_size, 1);  // TODO: use get_accel()
2331     free(dict_buffer);      // the dictionary is copied in the header now
2332     context->output_bytes += (int32_t)dict_actual_size;
2333     context->dict_size = dict_actual_size;
2334 
2335     /* Compress with dict */
2336     cbytes = blosc_compress_context(context);
2337 
2338     // Invalidate the dictionary for compressing other chunks using the same context
2339     context->dict_buffer = NULL;
2340     ZSTD_freeCDict(context->dict_cdict);
2341     context->dict_cdict = NULL;
2342 #endif  // HAVE_ZSTD
2343   }
2344 
2345   return cbytes;
2346 }
2347 
2348 
build_filters(const int doshuffle,const int delta,const size_t typesize,uint8_t * filters)2349 void build_filters(const int doshuffle, const int delta,
2350                    const size_t typesize, uint8_t* filters) {
2351 
2352   /* Fill the end part of the filter pipeline */
2353   if ((doshuffle == BLOSC_SHUFFLE) && (typesize > 1))
2354     filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_SHUFFLE;
2355   if (doshuffle == BLOSC_BITSHUFFLE)
2356     filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_BITSHUFFLE;
2357   if (delta)
2358     filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_DELTA;
2359 }
2360 
2361 /* The public secure routine for compression. */
blosc2_compress(int clevel,int doshuffle,int32_t typesize,const void * src,int32_t srcsize,void * dest,int32_t destsize)2362 int blosc2_compress(int clevel, int doshuffle, int32_t typesize,
2363                     const void* src, int32_t srcsize, void* dest, int32_t destsize) {
2364   int error;
2365   int result;
2366   char* envvar;
2367 
2368   /* Check whether the library should be initialized */
2369   if (!g_initlib) blosc_init();
2370 
2371   /* Check for a BLOSC_CLEVEL environment variable */
2372   envvar = getenv("BLOSC_CLEVEL");
2373   if (envvar != NULL) {
2374     long value;
2375     value = strtol(envvar, NULL, 10);
2376     if ((value != EINVAL) && (value >= 0)) {
2377       clevel = (int)value;
2378     }
2379   }
2380 
2381   /* Check for a BLOSC_SHUFFLE environment variable */
2382   envvar = getenv("BLOSC_SHUFFLE");
2383   if (envvar != NULL) {
2384     if (strcmp(envvar, "NOSHUFFLE") == 0) {
2385       doshuffle = BLOSC_NOSHUFFLE;
2386     }
2387     if (strcmp(envvar, "SHUFFLE") == 0) {
2388       doshuffle = BLOSC_SHUFFLE;
2389     }
2390     if (strcmp(envvar, "BITSHUFFLE") == 0) {
2391       doshuffle = BLOSC_BITSHUFFLE;
2392     }
2393   }
2394 
2395   /* Check for a BLOSC_DELTA environment variable */
2396   envvar = getenv("BLOSC_DELTA");
2397   if (envvar != NULL) {
2398     if (strcmp(envvar, "1") == 0) {
2399       blosc_set_delta(1);
2400     } else {
2401       blosc_set_delta(0);
2402     }
2403   }
2404 
2405   /* Check for a BLOSC_TYPESIZE environment variable */
2406   envvar = getenv("BLOSC_TYPESIZE");
2407   if (envvar != NULL) {
2408     long value;
2409     value = strtol(envvar, NULL, 10);
2410     if ((value != EINVAL) && (value > 0)) {
2411       typesize = (size_t)value;
2412     }
2413   }
2414 
2415   /* Check for a BLOSC_COMPRESSOR environment variable */
2416   envvar = getenv("BLOSC_COMPRESSOR");
2417   if (envvar != NULL) {
2418     result = blosc_set_compressor(envvar);
2419     if (result < 0) { return result; }
2420   }
2421 
2422   /* Check for a BLOSC_COMPRESSOR environment variable */
2423   envvar = getenv("BLOSC_BLOCKSIZE");
2424   if (envvar != NULL) {
2425     long blocksize;
2426     blocksize = strtol(envvar, NULL, 10);
2427     if ((blocksize != EINVAL) && (blocksize > 0)) {
2428       blosc_set_blocksize((size_t)blocksize);
2429     }
2430   }
2431 
2432   /* Check for a BLOSC_NTHREADS environment variable */
2433   envvar = getenv("BLOSC_NTHREADS");
2434   if (envvar != NULL) {
2435     long nthreads;
2436     nthreads = strtol(envvar, NULL, 10);
2437     if ((nthreads != EINVAL) && (nthreads > 0)) {
2438       result = blosc_set_nthreads((int)nthreads);
2439       if (result < 0) { return result; }
2440     }
2441   }
2442 
2443   /* Check for a BLOSC_NOLOCK environment variable.  It is important
2444      that this should be the last env var so that it can take the
2445      previous ones into account */
2446   envvar = getenv("BLOSC_NOLOCK");
2447   if (envvar != NULL) {
2448     // TODO: here is the only place that returns an extended header from
2449     //   a blosc_compress() call.  This should probably be fixed.
2450     const char *compname;
2451     blosc2_context *cctx;
2452     blosc2_cparams cparams = BLOSC2_CPARAMS_DEFAULTS;
2453 
2454     blosc_compcode_to_compname(g_compressor, &compname);
2455     /* Create a context for compression */
2456     build_filters(doshuffle, g_delta, typesize, cparams.filters);
2457     // TODO: cparams can be shared in a multithreaded environment.  do a copy!
2458     cparams.typesize = (uint8_t)typesize;
2459     cparams.compcode = (uint8_t)g_compressor;
2460     cparams.clevel = (uint8_t)clevel;
2461     cparams.nthreads = g_nthreads;
2462     cctx = blosc2_create_cctx(cparams);
2463     /* Do the actual compression */
2464     result = blosc2_compress_ctx(cctx, src, srcsize, dest, destsize);
2465     /* Release context resources */
2466     blosc2_free_ctx(cctx);
2467     return result;
2468   }
2469 
2470   pthread_mutex_lock(&global_comp_mutex);
2471 
2472   /* Initialize a context compression */
2473   uint8_t* filters = calloc(1, BLOSC2_MAX_FILTERS);
2474   BLOSC_ERROR_NULL(filters, BLOSC2_ERROR_MEMORY_ALLOC);
2475   uint8_t* filters_meta = calloc(1, BLOSC2_MAX_FILTERS);
2476   BLOSC_ERROR_NULL(filters_meta, BLOSC2_ERROR_MEMORY_ALLOC);
2477   build_filters(doshuffle, g_delta, typesize, filters);
2478   error = initialize_context_compression(
2479     g_global_context, src, srcsize, dest, destsize, clevel, filters,
2480     filters_meta, (int32_t)typesize, g_compressor, g_force_blocksize, g_nthreads, g_nthreads,
2481     &BTUNE_DEFAULTS, NULL, g_schunk);
2482   free(filters);
2483   free(filters_meta);
2484   if (error <= 0) {
2485     pthread_mutex_unlock(&global_comp_mutex);
2486     return error;
2487   }
2488 
2489   envvar = getenv("BLOSC_BLOSC1_COMPAT");
2490   if (envvar != NULL) {
2491     /* Write chunk header without extended header (Blosc1 compatibility mode) */
2492     error = write_compression_header(g_global_context, false);
2493   }
2494   else {
2495     error = write_compression_header(g_global_context, true);
2496   }
2497   if (error < 0) {
2498     pthread_mutex_unlock(&global_comp_mutex);
2499     return error;
2500   }
2501 
2502   result = blosc_compress_context(g_global_context);
2503 
2504   pthread_mutex_unlock(&global_comp_mutex);
2505 
2506   return result;
2507 }
2508 
2509 
2510 /* The public routine for compression. */
blosc_compress(int clevel,int doshuffle,size_t typesize,size_t nbytes,const void * src,void * dest,size_t destsize)2511 int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
2512                    const void* src, void* dest, size_t destsize) {
2513   return blosc2_compress(clevel, doshuffle, (int32_t)typesize, src, (int32_t)nbytes, dest, (int32_t)destsize);
2514 }
2515 
2516 
2517 
blosc_run_decompression_with_context(blosc2_context * context,const void * src,int32_t srcsize,void * dest,int32_t destsize)2518 int blosc_run_decompression_with_context(blosc2_context* context, const void* src, int32_t srcsize,
2519                                          void* dest, int32_t destsize) {
2520   blosc_header header;
2521   int32_t ntbytes;
2522   uint8_t* _src = (uint8_t*)src;
2523   int rc;
2524 
2525   rc = read_chunk_header(src, srcsize, true, &header);
2526   if (rc < 0) {
2527     return rc;
2528   }
2529 
2530   if (header.nbytes > destsize) {
2531     // Not enough space for writing into the destination
2532     return BLOSC2_ERROR_WRITE_BUFFER;
2533   }
2534 
2535   rc = initialize_context_decompression(context, &header, src, srcsize, dest, destsize);
2536   if (rc < 0) {
2537     return rc;
2538   }
2539 
2540   /* Do the actual decompression */
2541   ntbytes = do_job(context);
2542   if (ntbytes < 0) {
2543     return ntbytes;
2544   }
2545 
2546   assert(ntbytes <= (int32_t)destsize);
2547   return ntbytes;
2548 }
2549 
2550 
2551 /* The public secure routine for decompression with context. */
blosc2_decompress_ctx(blosc2_context * context,const void * src,int32_t srcsize,void * dest,int32_t destsize)2552 int blosc2_decompress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2553                           void* dest, int32_t destsize) {
2554   int result;
2555 
2556   if (context->do_compress != 0) {
2557     BLOSC_TRACE_ERROR("Context is not meant for decompression.  Giving up.");
2558     return BLOSC2_ERROR_INVALID_PARAM;
2559   }
2560 
2561   result = blosc_run_decompression_with_context(context, src, srcsize, dest, destsize);
2562 
2563   // Reset a possible block_maskout
2564   if (context->block_maskout != NULL) {
2565     free(context->block_maskout);
2566     context->block_maskout = NULL;
2567   }
2568   context->block_maskout_nitems = 0;
2569 
2570   return result;
2571 }
2572 
2573 
2574 /* The public secure routine for decompression. */
blosc2_decompress(const void * src,int32_t srcsize,void * dest,int32_t destsize)2575 int blosc2_decompress(const void* src, int32_t srcsize, void* dest, int32_t destsize) {
2576   int result;
2577   char* envvar;
2578   long nthreads;
2579   blosc2_context *dctx;
2580   blosc2_dparams dparams = BLOSC2_DPARAMS_DEFAULTS;
2581 
2582   /* Check whether the library should be initialized */
2583   if (!g_initlib) blosc_init();
2584 
2585   /* Check for a BLOSC_NTHREADS environment variable */
2586   envvar = getenv("BLOSC_NTHREADS");
2587   if (envvar != NULL) {
2588     nthreads = strtol(envvar, NULL, 10);
2589     if ((nthreads != EINVAL) && (nthreads > 0)) {
2590       result = blosc_set_nthreads((int)nthreads);
2591       if (result < 0) { return result; }
2592     }
2593   }
2594 
2595   /* Check for a BLOSC_NOLOCK environment variable.  It is important
2596      that this should be the last env var so that it can take the
2597      previous ones into account */
2598   envvar = getenv("BLOSC_NOLOCK");
2599   if (envvar != NULL) {
2600     dparams.nthreads = g_nthreads;
2601     dctx = blosc2_create_dctx(dparams);
2602     result = blosc2_decompress_ctx(dctx, src, srcsize, dest, destsize);
2603     blosc2_free_ctx(dctx);
2604     return result;
2605   }
2606 
2607   pthread_mutex_lock(&global_comp_mutex);
2608 
2609   result = blosc_run_decompression_with_context(
2610           g_global_context, src, srcsize, dest, destsize);
2611 
2612   pthread_mutex_unlock(&global_comp_mutex);
2613 
2614   return result;
2615 }
2616 
2617 
2618 /* The public routine for decompression. */
blosc_decompress(const void * src,void * dest,size_t destsize)2619 int blosc_decompress(const void* src, void* dest, size_t destsize) {
2620   return blosc2_decompress(src, INT32_MAX, dest, (int32_t)destsize);
2621 }
2622 
2623 
2624 /* Specific routine optimized for decompression a small number of
2625    items out of a compressed chunk.  This does not use threads because
2626    it would affect negatively to performance. */
_blosc_getitem(blosc2_context * context,blosc_header * header,const void * src,int32_t srcsize,int start,int nitems,void * dest,int32_t destsize)2627 int _blosc_getitem(blosc2_context* context, blosc_header* header, const void* src, int32_t srcsize,
2628                    int start, int nitems, void* dest, int32_t destsize) {
2629   uint8_t* _src = (uint8_t*)(src);  /* current pos for source buffer */
2630   uint8_t* _dest = (uint8_t*)(dest);
2631   int32_t ntbytes = 0;              /* the number of uncompressed bytes */
2632   int32_t bsize, bsize2, ebsize, leftoverblock;
2633   int32_t cbytes;
2634   int32_t startb, stopb;
2635   int32_t stop = start + nitems;
2636   int j, rc;
2637 
2638   if (nitems == 0) {
2639     // We have nothing to do
2640     return 0;
2641   }
2642   if (nitems * header->typesize > destsize) {
2643     BLOSC_TRACE_ERROR("`nitems`*`typesize` out of dest bounds.");
2644     return BLOSC2_ERROR_WRITE_BUFFER;
2645   }
2646 
2647   context->bstarts = (int32_t*)(_src + context->header_overhead);
2648 
2649   /* Check region boundaries */
2650   if ((start < 0) || (start * header->typesize > header->nbytes)) {
2651     BLOSC_TRACE_ERROR("`start` out of bounds.");
2652     return BLOSC2_ERROR_INVALID_PARAM;
2653   }
2654 
2655   if ((stop < 0) || (stop * header->typesize > header->nbytes)) {
2656     BLOSC_TRACE_ERROR("`start`+`nitems` out of bounds.");
2657     return BLOSC2_ERROR_INVALID_PARAM;
2658   }
2659 
2660   if (!context->special_type &&
2661       (_src + srcsize < (uint8_t *)(context->bstarts + context->nblocks))) {
2662     BLOSC_TRACE_ERROR("`bstarts` out of bounds.");
2663     return BLOSC2_ERROR_READ_BUFFER;
2664   }
2665 
2666   bool memcpyed = header->flags & (uint8_t)BLOSC_MEMCPYED;
2667   if (context->special_type) {
2668     // Fake a runlen as if its a memcpyed chunk
2669     memcpyed = true;
2670   }
2671 
2672   bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
2673                   (context->blosc2_flags & 0x08u) && !context->special_type);
2674   if (memcpyed && !is_lazy && !context->postfilter) {
2675     // Short-circuit for (non-lazy) memcpyed or special values
2676     ntbytes = nitems * header->typesize;
2677     switch (context->special_type) {
2678       case BLOSC2_SPECIAL_VALUE:
2679         // All repeated values
2680         rc = set_values(context->typesize, _src, _dest, ntbytes);
2681         if (rc < 0) {
2682           BLOSC_TRACE_ERROR("set_values failed");
2683           return BLOSC2_ERROR_DATA;
2684         }
2685         break;
2686       case BLOSC2_SPECIAL_NAN:
2687         rc = set_nans(context->typesize, _dest, ntbytes);
2688         if (rc < 0) {
2689           BLOSC_TRACE_ERROR("set_nans failed");
2690           return BLOSC2_ERROR_DATA;
2691         }
2692         break;
2693       case BLOSC2_SPECIAL_ZERO:
2694         memset(_dest, 0, ntbytes);
2695         break;
2696       case BLOSC2_SPECIAL_UNINIT:
2697         // We do nothing here
2698         break;
2699       case BLOSC2_NO_SPECIAL:
2700         _src += context->header_overhead + start * context->typesize;
2701         memcpy(_dest, _src, ntbytes);
2702         break;
2703       default:
2704         BLOSC_TRACE_ERROR("Unhandled special value case");
2705         return -1;
2706     }
2707     return ntbytes;
2708   }
2709 
2710   ebsize = header->blocksize + header->typesize * (signed)sizeof(int32_t);
2711   struct thread_context* scontext = context->serial_context;
2712   /* Resize the temporaries in serial context if needed */
2713   if (header->blocksize > scontext->tmp_blocksize) {
2714     my_free(scontext->tmp);
2715     scontext->tmp_nbytes = (size_t)4 * ebsize;
2716     scontext->tmp = my_malloc(scontext->tmp_nbytes);
2717     BLOSC_ERROR_NULL(scontext->tmp, BLOSC2_ERROR_MEMORY_ALLOC);
2718     scontext->tmp2 = scontext->tmp + ebsize;
2719     scontext->tmp3 = scontext->tmp2 + ebsize;
2720     scontext->tmp4 = scontext->tmp3 + ebsize;
2721     scontext->tmp_blocksize = (int32_t)header->blocksize;
2722   }
2723 
2724   for (j = 0; j < context->nblocks; j++) {
2725     bsize = header->blocksize;
2726     leftoverblock = 0;
2727     if ((j == context->nblocks - 1) && (context->leftover > 0)) {
2728       bsize = context->leftover;
2729       leftoverblock = 1;
2730     }
2731 
2732     /* Compute start & stop for each block */
2733     startb = start * header->typesize - j * header->blocksize;
2734     stopb = stop * header->typesize - j * header->blocksize;
2735     if (stopb <= 0) {
2736       // We can exit as soon as this block is beyond stop
2737       break;
2738     }
2739     if (startb >= header->blocksize) {
2740       continue;
2741     }
2742     if (startb < 0) {
2743       startb = 0;
2744     }
2745     if (stopb > header->blocksize) {
2746       stopb = header->blocksize;
2747     }
2748     bsize2 = stopb - startb;
2749 
2750     /* Do the actual data copy */
2751     // Regular decompression.  Put results in tmp2.
2752     // If the block is aligned and the worst case fits in destination, let's avoid a copy
2753     bool get_single_block = ((startb == 0) && (bsize == nitems * header->typesize));
2754     uint8_t* tmp2 = get_single_block ? dest : scontext->tmp2;
2755 
2756     // If memcpyed we don't have a bstarts section (because it is not needed)
2757     int32_t src_offset = memcpyed ?
2758       context->header_overhead + j * bsize : sw32_(context->bstarts + j);
2759 
2760     int cbytes = blosc_d(context->serial_context, bsize, leftoverblock, memcpyed,
2761                          src, srcsize, src_offset, j,
2762                          tmp2, 0, scontext->tmp, scontext->tmp3);
2763     if (cbytes < 0) {
2764       ntbytes = cbytes;
2765       break;
2766     }
2767     if (!get_single_block) {
2768       /* Copy to destination */
2769       memcpy((uint8_t *) dest + ntbytes, tmp2 + startb, (unsigned int) bsize2);
2770     }
2771     ntbytes += bsize2;
2772   }
2773 
2774   return ntbytes;
2775 }
2776 
blosc2_getitem(const void * src,int32_t srcsize,int start,int nitems,void * dest,int32_t destsize)2777 int blosc2_getitem(const void* src, int32_t srcsize, int start, int nitems, void* dest, int32_t destsize) {
2778   blosc2_context context;
2779   int result;
2780 
2781   /* Minimally populate the context */
2782   memset(&context, 0, sizeof(blosc2_context));
2783 
2784   context.schunk = g_schunk;
2785   context.nthreads = 1;  // force a serial decompression; fixes #95
2786 
2787   /* Call the actual getitem function */
2788   result = blosc2_getitem_ctx(&context, src, srcsize, start, nitems, dest, destsize);
2789 
2790   /* Release resources */
2791   if (context.serial_context != NULL) {
2792     free_thread_context(context.serial_context);
2793   }
2794   return result;
2795 }
2796 
2797 /* Specific routine optimized for decompression a small number of
2798    items out of a compressed chunk.  Public non-contextual API. */
blosc_getitem(const void * src,int start,int nitems,void * dest)2799 int blosc_getitem(const void* src, int start, int nitems, void* dest) {
2800   return blosc2_getitem(src, INT32_MAX, start, nitems, dest, INT32_MAX);
2801 }
2802 
blosc2_getitem_ctx(blosc2_context * context,const void * src,int32_t srcsize,int start,int nitems,void * dest,int32_t destsize)2803 int blosc2_getitem_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2804     int start, int nitems, void* dest, int32_t destsize) {
2805   blosc_header header;
2806   int result;
2807 
2808   /* Minimally populate the context */
2809   result = read_chunk_header((uint8_t *) src, srcsize, true, &header);
2810   if (result < 0) {
2811     return result;
2812   }
2813 
2814   context->src = src;
2815   context->srcsize = srcsize;
2816   context->dest = dest;
2817   context->destsize = destsize;
2818 
2819   result = blosc2_initialize_context_from_header(context, &header);
2820   if (result < 0) {
2821     return result;
2822   }
2823 
2824   if (context->serial_context == NULL) {
2825     context->serial_context = create_thread_context(context, 0);
2826   }
2827   BLOSC_ERROR_NULL(context->serial_context, BLOSC2_ERROR_THREAD_CREATE);
2828   /* Call the actual getitem function */
2829   result = _blosc_getitem(context, &header, src, srcsize, start, nitems, dest, destsize);
2830 
2831   return result;
2832 }
2833 
2834 /* execute single compression/decompression job for a single thread_context */
t_blosc_do_job(void * ctxt)2835 static void t_blosc_do_job(void *ctxt)
2836 {
2837   struct thread_context* thcontext = (struct thread_context*)ctxt;
2838   blosc2_context* context = thcontext->parent_context;
2839   int32_t cbytes;
2840   int32_t ntdest;
2841   int32_t tblocks;               /* number of blocks per thread */
2842   int32_t tblock;                /* limit block on a thread */
2843   int32_t nblock_;              /* private copy of nblock */
2844   int32_t bsize;
2845   int32_t leftoverblock;
2846   /* Parameters for threads */
2847   int32_t blocksize;
2848   int32_t ebsize;
2849   int32_t srcsize;
2850   bool compress = context->do_compress != 0;
2851   int32_t maxbytes;
2852   int32_t nblocks;
2853   int32_t leftover;
2854   int32_t leftover2;
2855   int32_t* bstarts;
2856   const uint8_t* src;
2857   uint8_t* dest;
2858   uint8_t* tmp;
2859   uint8_t* tmp2;
2860   uint8_t* tmp3;
2861 
2862   /* Get parameters for this thread before entering the main loop */
2863   blocksize = context->blocksize;
2864   ebsize = blocksize + context->typesize * sizeof(int32_t);
2865   maxbytes = context->destsize;
2866   nblocks = context->nblocks;
2867   leftover = context->leftover;
2868   bstarts = context->bstarts;
2869   src = context->src;
2870   srcsize = context->srcsize;
2871   dest = context->dest;
2872 
2873   /* Resize the temporaries if needed */
2874   if (blocksize > thcontext->tmp_blocksize) {
2875     my_free(thcontext->tmp);
2876     thcontext->tmp_nbytes = (size_t) 4 * ebsize;
2877     thcontext->tmp = my_malloc(thcontext->tmp_nbytes);
2878     thcontext->tmp2 = thcontext->tmp + ebsize;
2879     thcontext->tmp3 = thcontext->tmp2 + ebsize;
2880     thcontext->tmp4 = thcontext->tmp3 + ebsize;
2881     thcontext->tmp_blocksize = blocksize;
2882   }
2883 
2884   tmp = thcontext->tmp;
2885   tmp2 = thcontext->tmp2;
2886   tmp3 = thcontext->tmp3;
2887 
2888   // Determine whether we can do a static distribution of workload among different threads
2889   bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2890   if (!context->do_compress && context->special_type) {
2891     // Fake a runlen as if its a memcpyed chunk
2892     memcpyed = true;
2893   }
2894 
2895   bool static_schedule = (!compress || memcpyed) && context->block_maskout == NULL;
2896   if (static_schedule) {
2897       /* Blocks per thread */
2898       tblocks = nblocks / context->nthreads;
2899       leftover2 = nblocks % context->nthreads;
2900       tblocks = (leftover2 > 0) ? tblocks + 1 : tblocks;
2901       nblock_ = thcontext->tid * tblocks;
2902       tblock = nblock_ + tblocks;
2903       if (tblock > nblocks) {
2904           tblock = nblocks;
2905       }
2906   }
2907   else {
2908     // Use dynamic schedule via a queue.  Get the next block.
2909     pthread_mutex_lock(&context->count_mutex);
2910     context->thread_nblock++;
2911     nblock_ = context->thread_nblock;
2912     pthread_mutex_unlock(&context->count_mutex);
2913     tblock = nblocks;
2914   }
2915 
2916   /* Loop over blocks */
2917   leftoverblock = 0;
2918   while ((nblock_ < tblock) && (context->thread_giveup_code > 0)) {
2919     bsize = blocksize;
2920     if (nblock_ == (nblocks - 1) && (leftover > 0)) {
2921       bsize = leftover;
2922       leftoverblock = 1;
2923     }
2924     if (compress) {
2925       if (memcpyed) {
2926         if (!context->prefilter) {
2927           /* We want to memcpy only */
2928           memcpy(dest + context->header_overhead + nblock_ * blocksize,
2929                  src + nblock_ * blocksize, (unsigned int) bsize);
2930           cbytes = (int32_t) bsize;
2931         }
2932         else {
2933           /* Only the prefilter has to be executed, and this is done in blosc_c().
2934            * However, no further actions are needed, so we can put the result
2935            * directly in dest. */
2936           cbytes = blosc_c(thcontext, bsize, leftoverblock, 0,
2937                            ebsize, src, nblock_ * blocksize,
2938                            dest + context->header_overhead + nblock_ * blocksize,
2939                            tmp, tmp3);
2940         }
2941       }
2942       else {
2943         /* Regular compression */
2944         cbytes = blosc_c(thcontext, bsize, leftoverblock, 0,
2945                           ebsize, src, nblock_ * blocksize, tmp2, tmp, tmp3);
2946       }
2947     }
2948     else {
2949       /* Regular decompression */
2950       if (context->special_type == BLOSC2_NO_SPECIAL && !memcpyed &&
2951           (srcsize < (int32_t)(context->header_overhead + (sizeof(int32_t) * nblocks)))) {
2952         /* Not enough input to read all `bstarts` */
2953         cbytes = -1;
2954       }
2955       else {
2956         // If memcpyed we don't have a bstarts section (because it is not needed)
2957         int32_t src_offset = memcpyed ?
2958             context->header_overhead + nblock_ * blocksize : sw32_(bstarts + nblock_);
2959         cbytes = blosc_d(thcontext, bsize, leftoverblock, memcpyed,
2960                           src, srcsize, src_offset, nblock_,
2961                           dest, nblock_ * blocksize, tmp, tmp2);
2962       }
2963     }
2964 
2965     /* Check whether current thread has to giveup */
2966     if (context->thread_giveup_code <= 0) {
2967       break;
2968     }
2969 
2970     /* Check results for the compressed/decompressed block */
2971     if (cbytes < 0) {            /* compr/decompr failure */
2972       /* Set giveup_code error */
2973       pthread_mutex_lock(&context->count_mutex);
2974       context->thread_giveup_code = cbytes;
2975       pthread_mutex_unlock(&context->count_mutex);
2976       break;
2977     }
2978 
2979     if (compress && !memcpyed) {
2980       /* Start critical section */
2981       pthread_mutex_lock(&context->count_mutex);
2982       ntdest = context->output_bytes;
2983       // Note: do not use a typical local dict_training variable here
2984       // because it is probably cached from previous calls if the number of
2985       // threads does not change (the usual thing).
2986       if (!(context->use_dict && context->dict_cdict == NULL)) {
2987         _sw32(bstarts + nblock_, (int32_t) ntdest);
2988       }
2989 
2990       if ((cbytes == 0) || (ntdest + cbytes > maxbytes)) {
2991         context->thread_giveup_code = 0;  /* uncompressible buf */
2992         pthread_mutex_unlock(&context->count_mutex);
2993         break;
2994       }
2995       context->thread_nblock++;
2996       nblock_ = context->thread_nblock;
2997       context->output_bytes += cbytes;
2998       pthread_mutex_unlock(&context->count_mutex);
2999       /* End of critical section */
3000 
3001       /* Copy the compressed buffer to destination */
3002       memcpy(dest + ntdest, tmp2, (unsigned int) cbytes);
3003     }
3004     else if (static_schedule) {
3005       nblock_++;
3006     }
3007     else {
3008       pthread_mutex_lock(&context->count_mutex);
3009       context->thread_nblock++;
3010       nblock_ = context->thread_nblock;
3011       context->output_bytes += cbytes;
3012       pthread_mutex_unlock(&context->count_mutex);
3013     }
3014 
3015   } /* closes while (nblock_) */
3016 
3017   if (static_schedule) {
3018     context->output_bytes = context->sourcesize;
3019     if (compress) {
3020       context->output_bytes += context->header_overhead;
3021     }
3022   }
3023 
3024 }
3025 
3026 /* Decompress & unshuffle several blocks in a single thread */
t_blosc(void * ctxt)3027 static void* t_blosc(void* ctxt) {
3028   struct thread_context* thcontext = (struct thread_context*)ctxt;
3029   blosc2_context* context = thcontext->parent_context;
3030 #ifdef BLOSC_POSIX_BARRIERS
3031   int rc;
3032 #endif
3033 
3034   while (1) {
3035     /* Synchronization point for all threads (wait for initialization) */
3036     WAIT_INIT(NULL, context);
3037 
3038     if (context->end_threads) {
3039       break;
3040     }
3041 
3042     t_blosc_do_job(ctxt);
3043 
3044     /* Meeting point for all threads (wait for finalization) */
3045     WAIT_FINISH(NULL, context);
3046   }
3047 
3048   /* Cleanup our working space and context */
3049   free_thread_context(thcontext);
3050 
3051   return (NULL);
3052 }
3053 
3054 
init_threadpool(blosc2_context * context)3055 int init_threadpool(blosc2_context *context) {
3056   int32_t tid;
3057   int rc2;
3058 
3059   /* Initialize mutex and condition variable objects */
3060   pthread_mutex_init(&context->count_mutex, NULL);
3061   pthread_mutex_init(&context->delta_mutex, NULL);
3062   pthread_cond_init(&context->delta_cv, NULL);
3063 
3064   /* Set context thread sentinels */
3065   context->thread_giveup_code = 1;
3066   context->thread_nblock = -1;
3067 
3068   /* Barrier initialization */
3069 #ifdef BLOSC_POSIX_BARRIERS
3070   pthread_barrier_init(&context->barr_init, NULL, context->nthreads + 1);
3071   pthread_barrier_init(&context->barr_finish, NULL, context->nthreads + 1);
3072 #else
3073   pthread_mutex_init(&context->count_threads_mutex, NULL);
3074   pthread_cond_init(&context->count_threads_cv, NULL);
3075   context->count_threads = 0;      /* Reset threads counter */
3076 #endif
3077 
3078   if (threads_callback) {
3079       /* Create thread contexts to store data for callback threads */
3080     context->thread_contexts = (struct thread_context *)my_malloc(
3081             context->nthreads * sizeof(struct thread_context));
3082     BLOSC_ERROR_NULL(context->thread_contexts, BLOSC2_ERROR_MEMORY_ALLOC);
3083     for (tid = 0; tid < context->nthreads; tid++)
3084       init_thread_context(context->thread_contexts + tid, context, tid);
3085   }
3086   else {
3087     #if !defined(_WIN32)
3088       /* Initialize and set thread detached attribute */
3089       pthread_attr_init(&context->ct_attr);
3090       pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
3091     #endif
3092 
3093     /* Make space for thread handlers */
3094     context->threads = (pthread_t*)my_malloc(
3095             context->nthreads * sizeof(pthread_t));
3096     BLOSC_ERROR_NULL(context->threads, BLOSC2_ERROR_MEMORY_ALLOC);
3097     /* Finally, create the threads */
3098     for (tid = 0; tid < context->nthreads; tid++) {
3099       /* Create a thread context (will destroy when finished) */
3100       struct thread_context *thread_context = create_thread_context(context, tid);
3101       BLOSC_ERROR_NULL(thread_context, BLOSC2_ERROR_THREAD_CREATE);
3102       #if !defined(_WIN32)
3103         rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc,
3104                             (void*)thread_context);
3105       #else
3106         rc2 = pthread_create(&context->threads[tid], NULL, t_blosc,
3107                             (void *)thread_context);
3108       #endif
3109       if (rc2) {
3110         BLOSC_TRACE_ERROR("Return code from pthread_create() is %d.\n"
3111                           "\tError detail: %s\n", rc2, strerror(rc2));
3112         return BLOSC2_ERROR_THREAD_CREATE;
3113       }
3114     }
3115   }
3116 
3117   /* We have now started/initialized the threads */
3118   context->threads_started = context->nthreads;
3119   context->new_nthreads = context->nthreads;
3120 
3121   return 0;
3122 }
3123 
blosc_get_nthreads(void)3124 int16_t blosc_get_nthreads(void)
3125 {
3126   return g_nthreads;
3127 }
3128 
blosc_set_nthreads(int16_t nthreads_new)3129 int16_t blosc_set_nthreads(int16_t nthreads_new) {
3130   int16_t ret = g_nthreads;          /* the previous number of threads */
3131 
3132   /* Check whether the library should be initialized */
3133   if (!g_initlib) blosc_init();
3134 
3135  if (nthreads_new != ret) {
3136    g_nthreads = nthreads_new;
3137    g_global_context->new_nthreads = nthreads_new;
3138    check_nthreads(g_global_context);
3139  }
3140 
3141   return ret;
3142 }
3143 
3144 
blosc_get_compressor(void)3145 const char* blosc_get_compressor(void)
3146 {
3147   const char* compname;
3148   blosc_compcode_to_compname(g_compressor, &compname);
3149 
3150   return compname;
3151 }
3152 
blosc_set_compressor(const char * compname)3153 int blosc_set_compressor(const char* compname) {
3154   int code = blosc_compname_to_compcode(compname);
3155   if (code >= BLOSC_LAST_CODEC) {
3156     BLOSC_TRACE_ERROR("User defined codecs cannot be set here. Use Blosc2 mechanism instead.");
3157     return -1;
3158   }
3159   g_compressor = code;
3160 
3161   /* Check whether the library should be initialized */
3162   if (!g_initlib) blosc_init();
3163 
3164   return code;
3165 }
3166 
blosc_set_delta(int dodelta)3167 void blosc_set_delta(int dodelta) {
3168 
3169   g_delta = dodelta;
3170 
3171   /* Check whether the library should be initialized */
3172   if (!g_initlib) blosc_init();
3173 
3174 }
3175 
blosc_list_compressors(void)3176 const char* blosc_list_compressors(void) {
3177   static int compressors_list_done = 0;
3178   static char ret[256];
3179 
3180   if (compressors_list_done) return ret;
3181   ret[0] = '\0';
3182   strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
3183   strcat(ret, ",");
3184   strcat(ret, BLOSC_LZ4_COMPNAME);
3185   strcat(ret, ",");
3186   strcat(ret, BLOSC_LZ4HC_COMPNAME);
3187 #if defined(HAVE_ZLIB)
3188   strcat(ret, ",");
3189   strcat(ret, BLOSC_ZLIB_COMPNAME);
3190 #endif /* HAVE_ZLIB */
3191 #if defined(HAVE_ZSTD)
3192   strcat(ret, ",");
3193   strcat(ret, BLOSC_ZSTD_COMPNAME);
3194 #endif /* HAVE_ZSTD */
3195   compressors_list_done = 1;
3196   return ret;
3197 }
3198 
3199 
blosc_get_version_string(void)3200 const char* blosc_get_version_string(void) {
3201   return BLOSC_VERSION_STRING;
3202 }
3203 
3204 
blosc_get_complib_info(const char * compname,char ** complib,char ** version)3205 int blosc_get_complib_info(const char* compname, char** complib, char** version) {
3206   int clibcode;
3207   const char* clibname;
3208   const char* clibversion = "unknown";
3209   char sbuffer[256];
3210 
3211   clibcode = compname_to_clibcode(compname);
3212   clibname = clibcode_to_clibname(clibcode);
3213 
3214   /* complib version */
3215   if (clibcode == BLOSC_BLOSCLZ_LIB) {
3216     clibversion = BLOSCLZ_VERSION_STRING;
3217   }
3218   else if (clibcode == BLOSC_LZ4_LIB) {
3219     sprintf(sbuffer, "%d.%d.%d",
3220             LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
3221     clibversion = sbuffer;
3222   }
3223 #if defined(HAVE_ZLIB)
3224   else if (clibcode == BLOSC_ZLIB_LIB) {
3225     clibversion = ZLIB_VERSION;
3226   }
3227 #endif /* HAVE_ZLIB */
3228 #if defined(HAVE_ZSTD)
3229   else if (clibcode == BLOSC_ZSTD_LIB) {
3230     sprintf(sbuffer, "%d.%d.%d",
3231             ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
3232     clibversion = sbuffer;
3233   }
3234 #endif /* HAVE_ZSTD */
3235 
3236 #ifdef _MSC_VER
3237   *complib = _strdup(clibname);
3238   *version = _strdup(clibversion);
3239 #else
3240   *complib = strdup(clibname);
3241   *version = strdup(clibversion);
3242 #endif
3243   return clibcode;
3244 }
3245 
3246 /* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
blosc_cbuffer_sizes(const void * cbuffer,size_t * nbytes,size_t * cbytes,size_t * blocksize)3247 void blosc_cbuffer_sizes(const void* cbuffer, size_t* nbytes, size_t* cbytes, size_t* blocksize) {
3248   int32_t nbytes32, cbytes32, blocksize32;
3249   blosc2_cbuffer_sizes(cbuffer, &nbytes32, &cbytes32, &blocksize32);
3250   *nbytes = nbytes32;
3251   *cbytes = cbytes32;
3252   *blocksize = blocksize32;
3253 }
3254 
blosc2_cbuffer_sizes(const void * cbuffer,int32_t * nbytes,int32_t * cbytes,int32_t * blocksize)3255 int blosc2_cbuffer_sizes(const void* cbuffer, int32_t* nbytes, int32_t* cbytes, int32_t* blocksize) {
3256   blosc_header header;
3257   int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3258   if (rc < 0) {
3259     /* Return zeros if error reading header */
3260     memset(&header, 0, sizeof(header));
3261   }
3262 
3263   /* Read the interesting values */
3264   if (nbytes != NULL)
3265     *nbytes = header.nbytes;
3266   if (cbytes != NULL)
3267     *cbytes = header.cbytes;
3268   if (blocksize != NULL)
3269     *blocksize = header.blocksize;
3270   return rc;
3271 }
3272 
blosc_cbuffer_validate(const void * cbuffer,size_t cbytes,size_t * nbytes)3273 int blosc_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
3274   int32_t header_cbytes;
3275   int32_t header_nbytes;
3276   if (cbytes < BLOSC_MIN_HEADER_LENGTH) {
3277     /* Compressed data should contain enough space for header */
3278     *nbytes = 0;
3279     return BLOSC2_ERROR_WRITE_BUFFER;
3280   }
3281   int rc = blosc2_cbuffer_sizes(cbuffer, &header_nbytes, &header_cbytes, NULL);
3282   if (rc < 0) {
3283     *nbytes = 0;
3284     return rc;
3285   }
3286   *nbytes = header_nbytes;
3287   if (header_cbytes != cbytes) {
3288     /* Compressed size from header does not match `cbytes` */
3289     *nbytes = 0;
3290     return BLOSC2_ERROR_INVALID_HEADER;
3291   }
3292   if (*nbytes > BLOSC_MAX_BUFFERSIZE) {
3293     /* Uncompressed size is larger than allowed */
3294     *nbytes = 0;
3295     return BLOSC2_ERROR_MEMORY_ALLOC;
3296   }
3297   return 0;
3298 }
3299 
3300 /* Return `typesize` and `flags` from a compressed buffer. */
blosc_cbuffer_metainfo(const void * cbuffer,size_t * typesize,int * flags)3301 void blosc_cbuffer_metainfo(const void* cbuffer, size_t* typesize, int* flags) {
3302   blosc_header header;
3303   int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3304   if (rc < 0) {
3305     *typesize = *flags = 0;
3306     return;
3307   }
3308 
3309   /* Read the interesting values */
3310   *flags = header.flags;
3311   *typesize = header.typesize;
3312 }
3313 
3314 
3315 /* Return version information from a compressed buffer. */
blosc_cbuffer_versions(const void * cbuffer,int * version,int * versionlz)3316 void blosc_cbuffer_versions(const void* cbuffer, int* version, int* versionlz) {
3317   blosc_header header;
3318   int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3319   if (rc < 0) {
3320     *version = *versionlz = 0;
3321     return;
3322   }
3323 
3324   /* Read the version info */
3325   *version = header.version;
3326   *versionlz = header.versionlz;
3327 }
3328 
3329 
3330 /* Return the compressor library/format used in a compressed buffer. */
blosc_cbuffer_complib(const void * cbuffer)3331 const char* blosc_cbuffer_complib(const void* cbuffer) {
3332   blosc_header header;
3333   int clibcode;
3334   const char* complib;
3335   int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3336   if (rc < 0) {
3337     return NULL;
3338   }
3339 
3340   /* Read the compressor format/library info */
3341   clibcode = (header.flags & 0xe0) >> 5;
3342   complib = clibcode_to_clibname(clibcode);
3343   return complib;
3344 }
3345 
3346 
3347 /* Get the internal blocksize to be used during compression.  0 means
3348    that an automatic blocksize is computed internally. */
blosc_get_blocksize(void)3349 int blosc_get_blocksize(void)
3350 {
3351   return (int)g_force_blocksize;
3352 }
3353 
3354 
3355 /* Force the use of a specific blocksize.  If 0, an automatic
3356    blocksize will be used (the default). */
blosc_set_blocksize(size_t size)3357 void blosc_set_blocksize(size_t size) {
3358   g_force_blocksize = (int32_t)size;
3359 }
3360 
3361 
3362 /* Set pointer to super-chunk.  If NULL, no super-chunk will be
3363    reachable (the default). */
blosc_set_schunk(blosc2_schunk * schunk)3364 void blosc_set_schunk(blosc2_schunk* schunk) {
3365   g_schunk = schunk;
3366   g_global_context->schunk = schunk;
3367 }
3368 
3369 blosc2_io *blosc2_io_global = NULL;
3370 
blosc_init(void)3371 void blosc_init(void) {
3372   /* Return if Blosc is already initialized */
3373   if (g_initlib) return;
3374 
3375   g_ncodecs = 0;
3376   g_nfilters = 0;
3377 
3378 #if defined(HAVE_PLUGINS)
3379   #include "blosc2/blosc2-common.h"
3380   #include "blosc2/blosc2-stdio.h"
3381   register_codecs();
3382   register_filters();
3383 #endif
3384   pthread_mutex_init(&global_comp_mutex, NULL);
3385   /* Create a global context */
3386   g_global_context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3387   memset(g_global_context, 0, sizeof(blosc2_context));
3388   g_global_context->nthreads = g_nthreads;
3389   g_global_context->new_nthreads = g_nthreads;
3390   g_initlib = 1;
3391 }
3392 
3393 
blosc_destroy(void)3394 void blosc_destroy(void) {
3395   /* Return if Blosc is not initialized */
3396   if (!g_initlib) return;
3397 
3398   g_initlib = 0;
3399   blosc2_free_ctx(g_global_context);
3400 
3401   pthread_mutex_destroy(&global_comp_mutex);
3402 
3403 }
3404 
3405 
release_threadpool(blosc2_context * context)3406 int release_threadpool(blosc2_context *context) {
3407   int32_t t;
3408   void* status;
3409   int rc;
3410 
3411   if (context->threads_started > 0) {
3412     if (threads_callback) {
3413       /* free context data for user-managed threads */
3414       for (t=0; t<context->threads_started; t++)
3415         destroy_thread_context(context->thread_contexts + t);
3416       my_free(context->thread_contexts);
3417     }
3418     else {
3419       /* Tell all existing threads to finish */
3420       context->end_threads = 1;
3421       WAIT_INIT(-1, context);
3422 
3423       /* Join exiting threads */
3424       for (t = 0; t < context->threads_started; t++) {
3425         rc = pthread_join(context->threads[t], &status);
3426         if (rc) {
3427           BLOSC_TRACE_ERROR("Return code from pthread_join() is %d\n"
3428                             "\tError detail: %s.", rc, strerror(rc));
3429         }
3430       }
3431 
3432       /* Thread attributes */
3433       #if !defined(_WIN32)
3434         pthread_attr_destroy(&context->ct_attr);
3435       #endif
3436 
3437       /* Release thread handlers */
3438       my_free(context->threads);
3439     }
3440 
3441     /* Release mutex and condition variable objects */
3442     pthread_mutex_destroy(&context->count_mutex);
3443     pthread_mutex_destroy(&context->delta_mutex);
3444     pthread_cond_destroy(&context->delta_cv);
3445 
3446     /* Barriers */
3447   #ifdef BLOSC_POSIX_BARRIERS
3448     pthread_barrier_destroy(&context->barr_init);
3449     pthread_barrier_destroy(&context->barr_finish);
3450   #else
3451     pthread_mutex_destroy(&context->count_threads_mutex);
3452     pthread_cond_destroy(&context->count_threads_cv);
3453     context->count_threads = 0;      /* Reset threads counter */
3454   #endif
3455 
3456     /* Reset flags and counters */
3457     context->end_threads = 0;
3458     context->threads_started = 0;
3459   }
3460 
3461 
3462   return 0;
3463 }
3464 
blosc_free_resources(void)3465 int blosc_free_resources(void) {
3466   /* Return if Blosc is not initialized */
3467   if (!g_initlib) return BLOSC2_ERROR_FAILURE;
3468 
3469   return release_threadpool(g_global_context);
3470 }
3471 
3472 
3473 /* Contexts */
3474 
3475 /* Create a context for compression */
blosc2_create_cctx(blosc2_cparams cparams)3476 blosc2_context* blosc2_create_cctx(blosc2_cparams cparams) {
3477   blosc2_context* context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3478   BLOSC_ERROR_NULL(context, NULL);
3479 
3480   /* Populate the context, using zeros as default values */
3481   memset(context, 0, sizeof(blosc2_context));
3482   context->do_compress = 1;   /* meant for compression */
3483   context->compcode = cparams.compcode;
3484   context->compcode_meta = cparams.compcode_meta;
3485   context->clevel = cparams.clevel;
3486   context->use_dict = cparams.use_dict;
3487   context->typesize = cparams.typesize;
3488   for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
3489     context->filters[i] = cparams.filters[i];
3490     context->filters_meta[i] = cparams.filters_meta[i];
3491 
3492     if (context->filters[i] >= BLOSC_LAST_FILTER && context->filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
3493       BLOSC_TRACE_ERROR("filter (%d) is not yet defined",
3494                         context->filters[i]);
3495       free(context);
3496       return NULL;
3497     }
3498     if (context->filters[i] > BLOSC_LAST_REGISTERED_FILTER && context->filters[i] <= BLOSC2_GLOBAL_REGISTERED_FILTERS_STOP) {
3499       BLOSC_TRACE_ERROR("filter (%d) is not yet defined",
3500                         context->filters[i]);
3501       free(context);
3502       return NULL;
3503     }
3504   }
3505 
3506   context->nthreads = cparams.nthreads;
3507   context->new_nthreads = context->nthreads;
3508   context->blocksize = cparams.blocksize;
3509   context->splitmode = cparams.splitmode;
3510   context->threads_started = 0;
3511   context->schunk = cparams.schunk;
3512 
3513   if (cparams.prefilter != NULL) {
3514     context->prefilter = cparams.prefilter;
3515     context->preparams = (blosc2_prefilter_params*)my_malloc(sizeof(blosc2_prefilter_params));
3516     BLOSC_ERROR_NULL(context->preparams, NULL);
3517     memcpy(context->preparams, cparams.preparams, sizeof(blosc2_prefilter_params));
3518   }
3519 
3520   if (cparams.udbtune == NULL) {
3521     context->udbtune = &BTUNE_DEFAULTS;
3522   } else {
3523     context->udbtune = cparams.udbtune;
3524   }
3525 
3526   return context;
3527 }
3528 
3529 /* Create a context for decompression */
blosc2_create_dctx(blosc2_dparams dparams)3530 blosc2_context* blosc2_create_dctx(blosc2_dparams dparams) {
3531   blosc2_context* context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3532   BLOSC_ERROR_NULL(context, NULL);
3533 
3534   /* Populate the context, using zeros as default values */
3535   memset(context, 0, sizeof(blosc2_context));
3536   context->do_compress = 0;   /* Meant for decompression */
3537   context->nthreads = dparams.nthreads;
3538   context->new_nthreads = context->nthreads;
3539   context->threads_started = 0;
3540   context->block_maskout = NULL;
3541   context->block_maskout_nitems = 0;
3542   context->schunk = dparams.schunk;
3543 
3544   if (dparams.postfilter != NULL) {
3545     context->postfilter = dparams.postfilter;
3546     context->postparams = (blosc2_postfilter_params*)my_malloc(sizeof(blosc2_postfilter_params));
3547     BLOSC_ERROR_NULL(context->postparams, NULL);
3548     memcpy(context->postparams, dparams.postparams, sizeof(blosc2_postfilter_params));
3549   }
3550 
3551   return context;
3552 }
3553 
3554 
blosc2_free_ctx(blosc2_context * context)3555 void blosc2_free_ctx(blosc2_context* context) {
3556   release_threadpool(context);
3557   if (context->serial_context != NULL) {
3558     free_thread_context(context->serial_context);
3559   }
3560   if (context->dict_cdict != NULL) {
3561 #ifdef HAVE_ZSTD
3562     ZSTD_freeCDict(context->dict_cdict);
3563 #endif
3564   }
3565   if (context->dict_ddict != NULL) {
3566 #ifdef HAVE_ZSTD
3567     ZSTD_freeDDict(context->dict_ddict);
3568 #endif
3569   }
3570   if (context->btune != NULL) {
3571     context->udbtune->btune_free(context);
3572   }
3573   if (context->prefilter != NULL) {
3574     my_free(context->preparams);
3575   }
3576   if (context->postfilter != NULL) {
3577     my_free(context->postparams);
3578   }
3579 
3580   if (context->block_maskout != NULL) {
3581     free(context->block_maskout);
3582   }
3583   my_free(context);
3584 }
3585 
3586 
blosc2_ctx_get_cparams(blosc2_context * ctx,blosc2_cparams * cparams)3587 int blosc2_ctx_get_cparams(blosc2_context *ctx, blosc2_cparams *cparams) {
3588   cparams->compcode = ctx->compcode;
3589   cparams->compcode_meta = ctx->compcode_meta;
3590   cparams->clevel = ctx->clevel;
3591   cparams->use_dict = ctx->use_dict;
3592   cparams->typesize = ctx->typesize;
3593   cparams->nthreads = ctx->nthreads;
3594   cparams->blocksize = ctx->blocksize;
3595   cparams->splitmode = ctx->splitmode;
3596   cparams->schunk = ctx->schunk;
3597   for (int i = 0; i < BLOSC2_MAX_FILTERS; ++i) {
3598     cparams->filters[i] = ctx->filters[i];
3599     cparams->filters_meta[i] = ctx->filters_meta[i];
3600   }
3601   cparams->prefilter = ctx->prefilter;
3602   cparams->preparams = ctx->preparams;
3603   cparams->udbtune = ctx->udbtune;
3604 
3605   return BLOSC2_ERROR_SUCCESS;
3606 }
3607 
3608 
blosc2_ctx_get_dparams(blosc2_context * ctx,blosc2_dparams * dparams)3609 int blosc2_ctx_get_dparams(blosc2_context *ctx, blosc2_dparams *dparams) {
3610   dparams->nthreads = ctx->nthreads;
3611   dparams->schunk = ctx->schunk;
3612   dparams->postfilter = ctx->postfilter;
3613   dparams->postparams = ctx->postparams;
3614 
3615   return BLOSC2_ERROR_SUCCESS;
3616 }
3617 
3618 
3619 /* Set a maskout in decompression context */
blosc2_set_maskout(blosc2_context * ctx,bool * maskout,int nblocks)3620 int blosc2_set_maskout(blosc2_context *ctx, bool *maskout, int nblocks) {
3621 
3622   if (ctx->block_maskout != NULL) {
3623     // Get rid of a possible mask here
3624     free(ctx->block_maskout);
3625   }
3626 
3627   bool *maskout_ = malloc(nblocks);
3628   BLOSC_ERROR_NULL(maskout_, BLOSC2_ERROR_MEMORY_ALLOC);
3629   memcpy(maskout_, maskout, nblocks);
3630   ctx->block_maskout = maskout_;
3631   ctx->block_maskout_nitems = nblocks;
3632 
3633   return 0;
3634 }
3635 
3636 
3637 /* Create a chunk made of zeros */
blosc2_chunk_zeros(blosc2_cparams cparams,const size_t nbytes,void * dest,size_t destsize)3638 int blosc2_chunk_zeros(blosc2_cparams cparams, const size_t nbytes, void* dest, size_t destsize) {
3639   if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
3640     BLOSC_TRACE_ERROR("dest buffer is not long enough");
3641     return BLOSC2_ERROR_DATA;
3642   }
3643 
3644   if (nbytes % cparams.typesize) {
3645     BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
3646     return BLOSC2_ERROR_DATA;
3647   }
3648 
3649   blosc_header header;
3650   blosc2_context* context = blosc2_create_cctx(cparams);
3651 
3652   int error = initialize_context_compression(
3653           context, NULL, nbytes, dest, destsize,
3654           context->clevel, context->filters, context->filters_meta,
3655           context->typesize, context->compcode, context->blocksize,
3656           context->new_nthreads, context->nthreads,
3657           context->udbtune, context->btune, context->schunk);
3658   if (error <= 0) {
3659     blosc2_free_ctx(context);
3660     return error;
3661   }
3662 
3663   memset(&header, 0, sizeof(header));
3664   header.version = BLOSC_VERSION_FORMAT;
3665   header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
3666   header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
3667   header.typesize = context->typesize;
3668   header.nbytes = (int32_t)nbytes;
3669   header.blocksize = context->blocksize;
3670   header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
3671   header.blosc2_flags = BLOSC2_SPECIAL_ZERO << 4;  // mark chunk as all zeros
3672   memcpy((uint8_t *)dest, &header, sizeof(header));
3673 
3674   blosc2_free_ctx(context);
3675 
3676   return BLOSC_EXTENDED_HEADER_LENGTH;
3677 }
3678 
3679 
3680 /* Create a chunk made of uninitialized values */
blosc2_chunk_uninit(blosc2_cparams cparams,const size_t nbytes,void * dest,size_t destsize)3681 int blosc2_chunk_uninit(blosc2_cparams cparams, const size_t nbytes, void* dest, size_t destsize) {
3682   if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
3683     BLOSC_TRACE_ERROR("dest buffer is not long enough");
3684     return BLOSC2_ERROR_DATA;
3685   }
3686 
3687   if (nbytes % cparams.typesize) {
3688     BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
3689     return BLOSC2_ERROR_DATA;
3690   }
3691 
3692   blosc_header header;
3693   blosc2_context* context = blosc2_create_cctx(cparams);
3694   int error = initialize_context_compression(
3695           context, NULL, nbytes, dest, destsize,
3696           context->clevel, context->filters, context->filters_meta,
3697           context->typesize, context->compcode, context->blocksize,
3698           context->new_nthreads, context->nthreads,
3699           context->udbtune, context->btune, context->schunk);
3700   if (error <= 0) {
3701     blosc2_free_ctx(context);
3702     return error;
3703   }
3704 
3705   memset(&header, 0, sizeof(header));
3706   header.version = BLOSC_VERSION_FORMAT;
3707   header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
3708   header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
3709   header.typesize = context->typesize;
3710   header.nbytes = (int32_t)nbytes;
3711   header.blocksize = context->blocksize;
3712   header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
3713   header.blosc2_flags = BLOSC2_SPECIAL_UNINIT << 4;  // mark chunk as unitialized
3714   memcpy((uint8_t *)dest, &header, sizeof(header));
3715 
3716   blosc2_free_ctx(context);
3717 
3718   return BLOSC_EXTENDED_HEADER_LENGTH;
3719 }
3720 
3721 
3722 /* Create a chunk made of nans */
blosc2_chunk_nans(blosc2_cparams cparams,const size_t nbytes,void * dest,size_t destsize)3723 int blosc2_chunk_nans(blosc2_cparams cparams, const size_t nbytes, void* dest, size_t destsize) {
3724   if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
3725     BLOSC_TRACE_ERROR("dest buffer is not long enough");
3726     return BLOSC2_ERROR_DATA;
3727   }
3728 
3729   if (nbytes % cparams.typesize) {
3730     BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
3731     return BLOSC2_ERROR_DATA;
3732   }
3733 
3734   blosc_header header;
3735   blosc2_context* context = blosc2_create_cctx(cparams);
3736 
3737   int error = initialize_context_compression(
3738           context, NULL, nbytes, dest, destsize,
3739           context->clevel, context->filters, context->filters_meta,
3740           context->typesize, context->compcode, context->blocksize,
3741           context->new_nthreads, context->nthreads,
3742           context->udbtune, context->btune, context->schunk);
3743   if (error <= 0) {
3744     blosc2_free_ctx(context);
3745     return error;
3746   }
3747 
3748   memset(&header, 0, sizeof(header));
3749   header.version = BLOSC_VERSION_FORMAT;
3750   header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
3751   header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
3752   header.typesize = context->typesize;
3753   header.nbytes = (int32_t)nbytes;
3754   header.blocksize = context->blocksize;
3755   header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
3756   header.blosc2_flags = BLOSC2_SPECIAL_NAN << 4;  // mark chunk as all NaNs
3757   memcpy((uint8_t *)dest, &header, sizeof(header));
3758 
3759   blosc2_free_ctx(context);
3760 
3761   return BLOSC_EXTENDED_HEADER_LENGTH;
3762 }
3763 
3764 
3765 /* Create a chunk made of repeated values */
blosc2_chunk_repeatval(blosc2_cparams cparams,const size_t nbytes,void * dest,size_t destsize,void * repeatval)3766 int blosc2_chunk_repeatval(blosc2_cparams cparams, const size_t nbytes,
3767                            void* dest, size_t destsize, void* repeatval) {
3768   uint8_t typesize = cparams.typesize;
3769   if (destsize < BLOSC_EXTENDED_HEADER_LENGTH + typesize) {
3770     BLOSC_TRACE_ERROR("dest buffer is not long enough");
3771     return BLOSC2_ERROR_DATA;
3772   }
3773 
3774   if (nbytes % cparams.typesize) {
3775     BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
3776     return BLOSC2_ERROR_DATA;
3777   }
3778 
3779   blosc_header header;
3780   blosc2_context* context = blosc2_create_cctx(cparams);
3781 
3782   int error = initialize_context_compression(
3783           context, NULL, nbytes, dest, destsize,
3784           context->clevel, context->filters, context->filters_meta,
3785           context->typesize, context->compcode, context->blocksize,
3786           context->new_nthreads, context->nthreads,
3787           context->udbtune, context->btune, context->schunk);
3788   if (error <= 0) {
3789     blosc2_free_ctx(context);
3790     return error;
3791   }
3792 
3793   memset(&header, 0, sizeof(header));
3794   header.version = BLOSC_VERSION_FORMAT;
3795   header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
3796   header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
3797   header.typesize = (uint8_t)typesize;
3798   header.nbytes = (int32_t)nbytes;
3799   header.blocksize = context->blocksize;
3800   header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH + (int32_t)typesize;
3801   header.blosc2_flags = BLOSC2_SPECIAL_VALUE << 4;  // mark chunk as all repeated value
3802   memcpy((uint8_t *)dest, &header, sizeof(header));
3803   memcpy((uint8_t *)dest + sizeof(header), repeatval, typesize);
3804 
3805   blosc2_free_ctx(context);
3806 
3807   return BLOSC_EXTENDED_HEADER_LENGTH + (uint8_t)typesize;
3808 }
3809 
3810 
3811 /* Register filters */
3812 
register_filter_private(blosc2_filter * filter)3813 int register_filter_private(blosc2_filter *filter) {
3814     BLOSC_ERROR_NULL(filter, BLOSC2_ERROR_INVALID_PARAM);
3815     if (g_nfilters == UINT8_MAX) {
3816         BLOSC_TRACE_ERROR("Can not register more filters");
3817         return BLOSC2_ERROR_CODEC_SUPPORT;
3818     }
3819     if (filter->id < BLOSC2_GLOBAL_REGISTERED_FILTERS_START) {
3820         BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_FILTERS_START);
3821         return BLOSC2_ERROR_FAILURE;
3822     }
3823     if (filter->id > BLOSC2_USER_REGISTERED_FILTERS_STOP) {
3824         BLOSC_TRACE_ERROR("The id must be leather or equal than %d", BLOSC2_USER_REGISTERED_FILTERS_STOP);
3825         return BLOSC2_ERROR_FAILURE;
3826     }
3827 
3828     // Check if the filter is already registered
3829     for (int i = 0; i < g_nfilters; ++i) {
3830         if (g_filters[i].id == filter->id) {
3831             BLOSC_TRACE_ERROR("The filter is already registered!");
3832             return BLOSC2_ERROR_FAILURE;
3833         }
3834     }
3835 
3836     blosc2_filter *filter_new = &g_filters[g_nfilters++];
3837     memcpy(filter_new, filter, sizeof(blosc2_filter));
3838 
3839     return BLOSC2_ERROR_SUCCESS;
3840 }
3841 
3842 
blosc2_register_filter(blosc2_filter * filter)3843 int blosc2_register_filter(blosc2_filter *filter) {
3844   if (filter->id < BLOSC2_USER_REGISTERED_FILTERS_START) {
3845     BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_USER_REGISTERED_FILTERS_START);
3846     return BLOSC2_ERROR_FAILURE;
3847   }
3848 
3849   return register_filter_private(filter);
3850 }
3851 
3852 
3853 /* Register codecs */
3854 
register_codec_private(blosc2_codec * codec)3855 int register_codec_private(blosc2_codec *codec) {
3856     BLOSC_ERROR_NULL(codec, BLOSC2_ERROR_INVALID_PARAM);
3857     if (g_ncodecs == UINT8_MAX) {
3858         BLOSC_TRACE_ERROR("Can not register more codecs");
3859         return BLOSC2_ERROR_CODEC_SUPPORT;
3860     }
3861     if (codec->compcode < BLOSC2_GLOBAL_REGISTERED_CODECS_START) {
3862         BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_CODECS_START);
3863         return BLOSC2_ERROR_FAILURE;
3864     }
3865     if (codec->compcode > BLOSC2_USER_REGISTERED_CODECS_STOP) {
3866         BLOSC_TRACE_ERROR("The id must be leather or equal than %d", BLOSC2_USER_REGISTERED_CODECS_STOP);
3867         return BLOSC2_ERROR_FAILURE;
3868     }
3869 
3870     // Check if the code is already registered
3871     for (int i = 0; i < g_ncodecs; ++i) {
3872         if (g_codecs[i].compcode == codec->compcode) {
3873             BLOSC_TRACE_ERROR("The codec is already registered!");
3874             return BLOSC2_ERROR_CODEC_PARAM;
3875         }
3876     }
3877 
3878     blosc2_codec *codec_new = &g_codecs[g_ncodecs++];
3879     memcpy(codec_new, codec, sizeof(blosc2_codec));
3880 
3881     return BLOSC2_ERROR_SUCCESS;
3882 }
3883 
3884 
blosc2_register_codec(blosc2_codec * codec)3885 int blosc2_register_codec(blosc2_codec *codec) {
3886   if (codec->compcode < BLOSC2_USER_REGISTERED_CODECS_START) {
3887     BLOSC_TRACE_ERROR("The compcode must be greater or equal than %d", BLOSC2_USER_REGISTERED_CODECS_START);
3888     return BLOSC2_ERROR_CODEC_PARAM;
3889   }
3890 
3891   return register_codec_private(codec);
3892 }
3893 
3894 
_blosc2_register_io_cb(const blosc2_io_cb * io)3895 int _blosc2_register_io_cb(const blosc2_io_cb *io) {
3896 
3897   // Check if the io is already registered
3898   for (int i = 0; i < g_nio; ++i) {
3899     if (g_io[i].id == io->id) {
3900       BLOSC_TRACE_ERROR("The codec is already registered!");
3901       return BLOSC2_ERROR_PLUGIN_IO;
3902     }
3903   }
3904 
3905   blosc2_io_cb *io_new = &g_io[g_nio++];
3906   memcpy(io_new, io, sizeof(blosc2_io_cb));
3907 
3908   return BLOSC2_ERROR_SUCCESS;
3909 }
3910 
blosc2_register_io_cb(const blosc2_io_cb * io)3911 int blosc2_register_io_cb(const blosc2_io_cb *io) {
3912   BLOSC_ERROR_NULL(io, BLOSC2_ERROR_INVALID_PARAM);
3913   if (g_nio == UINT8_MAX) {
3914     BLOSC_TRACE_ERROR("Can not register more codecs");
3915     return BLOSC2_ERROR_PLUGIN_IO;
3916   }
3917 
3918   if (io->id < BLOSC2_IO_REGISTERED) {
3919     BLOSC_TRACE_ERROR("The compcode must be greater or equal than %d", BLOSC2_IO_REGISTERED);
3920     return BLOSC2_ERROR_PLUGIN_IO;
3921   }
3922 
3923   return _blosc2_register_io_cb(io);
3924 }
3925 
blosc2_get_io_cb(uint8_t id)3926 blosc2_io_cb *blosc2_get_io_cb(uint8_t id) {
3927   for (int i = 0; i < g_nio; ++i) {
3928     if (g_io[i].id == id) {
3929       return &g_io[i];
3930     }
3931   }
3932   if (id == BLOSC2_IO_FILESYSTEM) {
3933     if (_blosc2_register_io_cb(&BLOSC2_IO_CB_DEFAULTS) < 0) {
3934       BLOSC_TRACE_ERROR("Error registering the default IO API");
3935       return NULL;
3936     }
3937     return blosc2_get_io_cb(id);
3938   }
3939   return NULL;
3940 }
3941