1 /*
2 Copyright (C) 2011 Serge Belyshev
3 Copyright (C) 2006-2016 Con Kolivas
4 Copyright (C) 2011 Peter Hyman
5 Copyright (C) 1998 Andrew Tridgell
6
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 */
20 /* multiplex N streams into a file - the streams are passed
21 through different compressors */
22
23 #ifdef HAVE_CONFIG_H
24 # include "config.h"
25 #endif
26
27 #ifdef HAVE_SYS_TIME_H
28 # include <sys/time.h>
29 #endif
30 #ifdef HAVE_SYS_TYPES_H
31 # include <sys/types.h>
32 #endif
33 #ifdef HAVE_SYS_RESOURCE_H
34 # include <sys/resource.h>
35 #endif
36 #ifdef HAVE_UNISTD_H
37 # include <unistd.h>
38 #endif
39 #include <sys/statvfs.h>
40 #include <pthread.h>
41 #include <bzlib.h>
42 #include <zlib.h>
43 #include <lzo/lzoconf.h>
44 #include <lzo/lzo1x.h>
45 #ifdef HAVE_ERRNO_H
46 # include <errno.h>
47 #endif
48 #ifdef HAVE_ENDIAN_H
49 # include <endian.h>
50 #elif HAVE_SYS_ENDIAN_H
51 # include <sys/endian.h>
52 #endif
53 #ifdef HAVE_ARPA_INET_H
54 # include <arpa/inet.h>
55 #endif
56
57 /* LZMA C Wrapper */
58 #include "lzma/C/LzmaLib.h"
59
60 #include "util.h"
61 #include "lrzip_core.h"
62
63 #define STREAM_BUFSIZE (1024 * 1024 * 10)
64
65 static struct compress_thread{
66 uchar *s_buf; /* Uncompressed buffer -> Compressed buffer */
67 uchar c_type; /* Compression type */
68 i64 s_len; /* Data length uncompressed */
69 i64 c_len; /* Data length compressed */
70 cksem_t cksem; /* This thread's semaphore */
71 struct stream_info *sinfo;
72 int streamno;
73 uchar salt[SALT_LEN];
74 } *cthread;
75
76 static struct uncomp_thread{
77 uchar *s_buf;
78 i64 u_len, c_len;
79 i64 last_head;
80 uchar c_type;
81 int busy;
82 int streamno;
83 } *ucthread;
84
85 typedef struct stream_thread_struct {
86 int i;
87 rzip_control *control;
88 } stream_thread_struct;
89
90 static long output_thread;
91 static pthread_mutex_t output_lock = PTHREAD_MUTEX_INITIALIZER;
92 static pthread_cond_t output_cond = PTHREAD_COND_INITIALIZER;
93 static pthread_t *threads;
94
init_mutex(rzip_control * control,pthread_mutex_t * mutex)95 bool init_mutex(rzip_control *control, pthread_mutex_t *mutex)
96 {
97 if (unlikely(pthread_mutex_init(mutex, NULL)))
98 fatal_return(("pthread_mutex_init failed"), false);
99 return true;
100 }
101
unlock_mutex(rzip_control * control,pthread_mutex_t * mutex)102 bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex)
103 {
104 if (unlikely(pthread_mutex_unlock(mutex)))
105 fatal_return(("pthread_mutex_unlock failed"), false);
106 return true;
107 }
108
lock_mutex(rzip_control * control,pthread_mutex_t * mutex)109 bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex)
110 {
111 if (unlikely(pthread_mutex_lock(mutex)))
112 fatal_return(("pthread_mutex_lock failed"), false);
113 return true;
114 }
115
cond_wait(rzip_control * control,pthread_cond_t * cond,pthread_mutex_t * mutex)116 static bool cond_wait(rzip_control *control, pthread_cond_t *cond, pthread_mutex_t *mutex)
117 {
118 if (unlikely(pthread_cond_wait(cond, mutex)))
119 fatal_return(("pthread_cond_wait failed"), false);
120 return true;
121 }
122
cond_broadcast(rzip_control * control,pthread_cond_t * cond)123 static bool cond_broadcast(rzip_control *control, pthread_cond_t *cond)
124 {
125 if (unlikely(pthread_cond_broadcast(cond)))
126 fatal_return(("pthread_cond_broadcast failed"), false);
127 return true;
128 }
129
create_pthread(rzip_control * control,pthread_t * thread,pthread_attr_t * attr,void * (* start_routine)(void *),void * arg)130 bool create_pthread(rzip_control *control, pthread_t *thread, pthread_attr_t * attr,
131 void * (*start_routine)(void *), void *arg)
132 {
133 if (unlikely(pthread_create(thread, attr, start_routine, arg)))
134 fatal_return(("pthread_create"), false);
135 return true;
136 }
137
detach_pthread(rzip_control * control,pthread_t * thread)138 bool detach_pthread(rzip_control *control, pthread_t *thread)
139 {
140 if (unlikely(pthread_detach(*thread)))
141 fatal_return(("pthread_detach"), false);
142 return true;
143 }
144
join_pthread(rzip_control * control,pthread_t th,void ** thread_return)145 bool join_pthread(rzip_control *control, pthread_t th, void **thread_return)
146 {
147 if (pthread_join(th, thread_return))
148 fatal_return(("pthread_join"), false);
149 return true;
150 }
151
152 /* just to keep things clean, declare function here
153 * but move body to the end since it's a work function
154 */
155 static int lzo_compresses(rzip_control *control, uchar *s_buf, i64 s_len);
156
157 /*
158 ***** COMPRESSION FUNCTIONS *****
159
160 ZPAQ, BZIP, GZIP, LZMA, LZO
161
162 try to compress a buffer. If compression fails for whatever reason then
163 leave uncompressed. Return the compression type in c_type and resulting
164 length in c_len
165 */
166
zpaq_compress_buf(rzip_control * control,struct compress_thread * cthread,long thread)167 static int zpaq_compress_buf(rzip_control *control, struct compress_thread *cthread, long thread)
168 {
169 i64 c_len, c_size;
170 uchar *c_buf;
171
172 if (!lzo_compresses(control, cthread->s_buf, cthread->s_len))
173 return 0;
174
175 c_size = round_up_page(control, cthread->s_len + 10000);
176 c_buf = malloc(c_size);
177 if (!c_buf) {
178 print_err("Unable to allocate c_buf in zpaq_compress_buf\n");
179 return -1;
180 }
181
182 c_len = 0;
183
184 zpaq_compress(c_buf, &c_len, cthread->s_buf, cthread->s_len, control->compression_level / 4 + 1,
185 control->msgout, SHOW_PROGRESS ? true: false, thread);
186
187 if (unlikely(c_len >= cthread->c_len)) {
188 print_maxverbose("Incompressible block\n");
189 /* Incompressible, leave as CTYPE_NONE */
190 free(c_buf);
191 return 0;
192 }
193
194 cthread->c_len = c_len;
195 free(cthread->s_buf);
196 cthread->s_buf = c_buf;
197 cthread->c_type = CTYPE_ZPAQ;
198 return 0;
199 }
200
bzip2_compress_buf(rzip_control * control,struct compress_thread * cthread)201 static int bzip2_compress_buf(rzip_control *control, struct compress_thread *cthread)
202 {
203 u32 dlen = round_up_page(control, cthread->s_len);
204 int bzip2_ret;
205 uchar *c_buf;
206
207 if (!lzo_compresses(control, cthread->s_buf, cthread->s_len))
208 return 0;
209
210 c_buf = malloc(dlen);
211 if (!c_buf) {
212 print_err("Unable to allocate c_buf in bzip2_compress_buf\n");
213 return -1;
214 }
215
216 bzip2_ret = BZ2_bzBuffToBuffCompress((char *)c_buf, &dlen,
217 (char *)cthread->s_buf, cthread->s_len,
218 control->compression_level, 0, control->compression_level * 10);
219
220 /* if compressed data is bigger then original data leave as
221 * CTYPE_NONE */
222
223 if (bzip2_ret == BZ_OUTBUFF_FULL) {
224 print_maxverbose("Incompressible block\n");
225 /* Incompressible, leave as CTYPE_NONE */
226 free(c_buf);
227 return 0;
228 }
229
230 if (unlikely(bzip2_ret != BZ_OK)) {
231 free(c_buf);
232 print_maxverbose("BZ2 compress failed\n");
233 return -1;
234 }
235
236 if (unlikely(dlen >= cthread->c_len)) {
237 print_maxverbose("Incompressible block\n");
238 /* Incompressible, leave as CTYPE_NONE */
239 free(c_buf);
240 return 0;
241 }
242
243 cthread->c_len = dlen;
244 free(cthread->s_buf);
245 cthread->s_buf = c_buf;
246 cthread->c_type = CTYPE_BZIP2;
247 return 0;
248 }
249
gzip_compress_buf(rzip_control * control,struct compress_thread * cthread)250 static int gzip_compress_buf(rzip_control *control, struct compress_thread *cthread)
251 {
252 unsigned long dlen = round_up_page(control, cthread->s_len);
253 uchar *c_buf;
254 int gzip_ret;
255
256 c_buf = malloc(dlen);
257 if (!c_buf) {
258 print_err("Unable to allocate c_buf in gzip_compress_buf\n");
259 return -1;
260 }
261
262 gzip_ret = compress2(c_buf, &dlen, cthread->s_buf, cthread->s_len,
263 control->compression_level);
264
265 /* if compressed data is bigger then original data leave as
266 * CTYPE_NONE */
267
268 if (gzip_ret == Z_BUF_ERROR) {
269 print_maxverbose("Incompressible block\n");
270 /* Incompressible, leave as CTYPE_NONE */
271 free(c_buf);
272 return 0;
273 }
274
275 if (unlikely(gzip_ret != Z_OK)) {
276 free(c_buf);
277 print_maxverbose("compress2 failed\n");
278 return -1;
279 }
280
281 if (unlikely((i64)dlen >= cthread->c_len)) {
282 print_maxverbose("Incompressible block\n");
283 /* Incompressible, leave as CTYPE_NONE */
284 free(c_buf);
285 return 0;
286 }
287
288 cthread->c_len = dlen;
289 free(cthread->s_buf);
290 cthread->s_buf = c_buf;
291 cthread->c_type = CTYPE_GZIP;
292 return 0;
293 }
294
lzma_compress_buf(rzip_control * control,struct compress_thread * cthread)295 static int lzma_compress_buf(rzip_control *control, struct compress_thread *cthread)
296 {
297 unsigned char lzma_properties[5]; /* lzma properties, encoded */
298 int lzma_level, lzma_ret;
299 size_t prop_size = 5; /* return value for lzma_properties */
300 uchar *c_buf;
301 size_t dlen;
302
303 if (!lzo_compresses(control, cthread->s_buf, cthread->s_len))
304 return 0;
305
306 /* only 7 levels with lzma, scale them */
307 lzma_level = control->compression_level * 7 / 9;
308 if (!lzma_level)
309 lzma_level = 1;
310
311 print_maxverbose("Starting lzma back end compression thread...\n");
312 retry:
313 dlen = round_up_page(control, cthread->s_len);
314 c_buf = malloc(dlen);
315 if (!c_buf) {
316 print_err("Unable to allocate c_buf in lzma_compress_buf\n");
317 return -1;
318 }
319
320 /* with LZMA SDK 4.63, we pass compression level and threads only
321 * and receive properties in lzma_properties */
322
323 lzma_ret = LzmaCompress(c_buf, &dlen, cthread->s_buf,
324 (size_t)cthread->s_len, lzma_properties, &prop_size,
325 lzma_level,
326 0, /* dict size. set default, choose by level */
327 -1, -1, -1, -1, /* lc, lp, pb, fb */
328 control->threads);
329 if (lzma_ret != SZ_OK) {
330 switch (lzma_ret) {
331 case SZ_ERROR_MEM:
332 break;
333 case SZ_ERROR_PARAM:
334 print_err("LZMA Parameter ERROR: %d. This should not happen.\n", SZ_ERROR_PARAM);
335 break;
336 case SZ_ERROR_OUTPUT_EOF:
337 print_maxverbose("Harmless LZMA Output Buffer Overflow error: %d. Incompressible block.\n", SZ_ERROR_OUTPUT_EOF);
338 break;
339 case SZ_ERROR_THREAD:
340 print_err("LZMA Multi Thread ERROR: %d. This should not happen.\n", SZ_ERROR_THREAD);
341 break;
342 default:
343 print_err("Unidentified LZMA ERROR: %d. This should not happen.\n", lzma_ret);
344 break;
345 }
346 /* can pass -1 if not compressible! Thanks Lasse Collin */
347 free(c_buf);
348 if (lzma_ret == SZ_ERROR_MEM) {
349 if (lzma_level > 1) {
350 lzma_level--;
351 print_verbose("LZMA Warning: %d. Can't allocate enough RAM for compression window, trying smaller.\n", SZ_ERROR_MEM);
352 goto retry;
353 }
354 /* lzma compress can be fragile on 32 bit. If it fails,
355 * fall back to bzip2 compression so the block doesn't
356 * remain uncompressed */
357 print_verbose("Unable to allocate enough RAM for any sized compression window, falling back to bzip2 compression.\n");
358 return bzip2_compress_buf(control, cthread);
359 } else if (lzma_ret == SZ_ERROR_OUTPUT_EOF)
360 return 0;
361 return -1;
362 }
363
364 if (unlikely((i64)dlen >= cthread->c_len)) {
365 /* Incompressible, leave as CTYPE_NONE */
366 print_maxverbose("Incompressible block\n");
367 free(c_buf);
368 return 0;
369 }
370
371 /* Make sure multiple threads don't race on writing lzma_properties */
372 lock_mutex(control, &control->control_lock);
373 if (!control->lzma_prop_set) {
374 memcpy(control->lzma_properties, lzma_properties, 5);
375 control->lzma_prop_set = true;
376 /* Reset the magic written flag so we write it again if we
377 * get lzma properties and haven't written them yet. */
378 if (TMP_OUTBUF)
379 control->magic_written = 0;
380 }
381 unlock_mutex(control, &control->control_lock);
382
383 cthread->c_len = dlen;
384 free(cthread->s_buf);
385 cthread->s_buf = c_buf;
386 cthread->c_type = CTYPE_LZMA;
387 return 0;
388 }
389
lzo_compress_buf(rzip_control * control,struct compress_thread * cthread)390 static int lzo_compress_buf(rzip_control *control, struct compress_thread *cthread)
391 {
392 lzo_uint in_len = cthread->s_len;
393 lzo_uint dlen = round_up_page(control, in_len + in_len / 16 + 64 + 3);
394 lzo_bytep wrkmem;
395 uchar *c_buf;
396 int ret = -1;
397
398 wrkmem = (lzo_bytep) calloc(1, LZO1X_1_MEM_COMPRESS);
399 if (unlikely(wrkmem == NULL)) {
400 print_maxverbose("Failed to malloc wkmem\n");
401 return ret;
402 }
403
404 c_buf = malloc(dlen);
405 if (!c_buf) {
406 print_err("Unable to allocate c_buf in lzo_compress_buf");
407 goto out_free;
408 }
409
410 /* lzo1x_1_compress does not return anything but LZO_OK so we ignore
411 * the return value */
412 lzo1x_1_compress(cthread->s_buf, in_len, c_buf, &dlen, wrkmem);
413 ret = 0;
414
415 if (dlen >= in_len){
416 /* Incompressible, leave as CTYPE_NONE */
417 print_maxverbose("Incompressible block\n");
418 free(c_buf);
419 goto out_free;
420 }
421
422 cthread->c_len = dlen;
423 free(cthread->s_buf);
424 cthread->s_buf = c_buf;
425 cthread->c_type = CTYPE_LZO;
426 out_free:
427 free(wrkmem);
428 return ret;
429 }
430
431 /*
432 ***** DECOMPRESSION FUNCTIONS *****
433
434 ZPAQ, BZIP, GZIP, LZMA, LZO
435
436 try to decompress a buffer. Return 0 on success and -1 on failure.
437 */
zpaq_decompress_buf(rzip_control * control __UNUSED__,struct uncomp_thread * ucthread,long thread)438 static int zpaq_decompress_buf(rzip_control *control __UNUSED__, struct uncomp_thread *ucthread, long thread)
439 {
440 i64 dlen = ucthread->u_len;
441 uchar *c_buf;
442 int ret = 0;
443
444 c_buf = ucthread->s_buf;
445 ucthread->s_buf = malloc(round_up_page(control, dlen));
446 if (unlikely(!ucthread->s_buf)) {
447 print_err("Failed to allocate %ld bytes for decompression\n", dlen);
448 ret = -1;
449 goto out;
450 }
451
452 dlen = 0;
453 zpaq_decompress(ucthread->s_buf, &dlen, c_buf, ucthread->c_len,
454 control->msgout, SHOW_PROGRESS ? true: false, thread);
455
456 if (unlikely(dlen != ucthread->u_len)) {
457 print_err("Inconsistent length after decompression. Got %ld bytes, expected %lld\n", dlen, ucthread->u_len);
458 ret = -1;
459 }
460
461 free(c_buf);
462 out:
463 if (ret == -1)
464 ucthread->s_buf = c_buf;
465 return ret;
466 }
467
bzip2_decompress_buf(rzip_control * control __UNUSED__,struct uncomp_thread * ucthread)468 static int bzip2_decompress_buf(rzip_control *control __UNUSED__, struct uncomp_thread *ucthread)
469 {
470 u32 dlen = ucthread->u_len;
471 int ret = 0, bzerr;
472 uchar *c_buf;
473
474 c_buf = ucthread->s_buf;
475 ucthread->s_buf = malloc(round_up_page(control, dlen));
476 if (unlikely(!ucthread->s_buf)) {
477 print_err("Failed to allocate %d bytes for decompression\n", dlen);
478 ret = -1;
479 goto out;
480 }
481
482 bzerr = BZ2_bzBuffToBuffDecompress((char*)ucthread->s_buf, &dlen, (char*)c_buf, ucthread->c_len, 0, 0);
483 if (unlikely(bzerr != BZ_OK)) {
484 print_err("Failed to decompress buffer - bzerr=%d\n", bzerr);
485 free(ucthread->s_buf);
486 ucthread->s_buf = c_buf;
487 ret = -1;
488 goto out;
489 }
490
491 if (unlikely(dlen != ucthread->u_len)) {
492 print_err("Inconsistent length after decompression. Got %d bytes, expected %lld\n", dlen, ucthread->u_len);
493 ret = -1;
494 }
495
496 free(c_buf);
497 out:
498 if (ret == -1)
499 ucthread->s_buf = c_buf;
500 return ret;
501 }
502
gzip_decompress_buf(rzip_control * control __UNUSED__,struct uncomp_thread * ucthread)503 static int gzip_decompress_buf(rzip_control *control __UNUSED__, struct uncomp_thread *ucthread)
504 {
505 unsigned long dlen = ucthread->u_len;
506 int ret = 0, gzerr;
507 uchar *c_buf;
508
509 c_buf = ucthread->s_buf;
510 ucthread->s_buf = malloc(round_up_page(control, dlen));
511 if (unlikely(!ucthread->s_buf)) {
512 print_err("Failed to allocate %ld bytes for decompression\n", dlen);
513 ret = -1;
514 goto out;
515 }
516
517 gzerr = uncompress(ucthread->s_buf, &dlen, c_buf, ucthread->c_len);
518 if (unlikely(gzerr != Z_OK)) {
519 print_err("Failed to decompress buffer - gzerr=%d\n", gzerr);
520 free(ucthread->s_buf);
521 ucthread->s_buf = c_buf;
522 ret = -1;
523 goto out;
524 }
525
526 if (unlikely((i64)dlen != ucthread->u_len)) {
527 print_err("Inconsistent length after decompression. Got %ld bytes, expected %lld\n", dlen, ucthread->u_len);
528 ret = -1;
529 }
530
531 free(c_buf);
532 out:
533 if (ret == -1)
534 ucthread->s_buf = c_buf;
535 return ret;
536 }
537
lzma_decompress_buf(rzip_control * control,struct uncomp_thread * ucthread)538 static int lzma_decompress_buf(rzip_control *control, struct uncomp_thread *ucthread)
539 {
540 size_t dlen = ucthread->u_len;
541 int ret = 0, lzmaerr;
542 uchar *c_buf;
543 SizeT c_len = ucthread->c_len;
544
545 c_buf = ucthread->s_buf;
546 ucthread->s_buf = malloc(round_up_page(control, dlen));
547 if (unlikely(!ucthread->s_buf)) {
548 print_err("Failed to allocate %lld bytes for decompression\n", (i64)dlen);
549 ret = -1;
550 goto out;
551 }
552
553 /* With LZMA SDK 4.63 we pass control->lzma_properties
554 * which is needed for proper uncompress */
555 lzmaerr = LzmaUncompress(ucthread->s_buf, &dlen, c_buf, &c_len, control->lzma_properties, 5);
556 if (unlikely(lzmaerr)) {
557 print_err("Failed to decompress buffer - lzmaerr=%d\n", lzmaerr);
558 free(ucthread->s_buf);
559 ucthread->s_buf = c_buf;
560 ret = -1;
561 goto out;
562 }
563
564 if (unlikely((i64)dlen != ucthread->u_len)) {
565 print_err("Inconsistent length after decompression. Got %lld bytes, expected %lld\n", (i64)dlen, ucthread->u_len);
566 ret = -1;
567 }
568
569 free(c_buf);
570 out:
571 if (ret == -1)
572 ucthread->s_buf = c_buf;
573 return ret;
574 }
575
lzo_decompress_buf(rzip_control * control __UNUSED__,struct uncomp_thread * ucthread)576 static int lzo_decompress_buf(rzip_control *control __UNUSED__, struct uncomp_thread *ucthread)
577 {
578 lzo_uint dlen = ucthread->u_len;
579 int ret = 0, lzerr;
580 uchar *c_buf;
581
582 c_buf = ucthread->s_buf;
583 ucthread->s_buf = malloc(round_up_page(control, dlen));
584 if (unlikely(!ucthread->s_buf)) {
585 print_err("Failed to allocate %lu bytes for decompression\n", (unsigned long)dlen);
586 ret = -1;
587 goto out;
588 }
589
590 lzerr = lzo1x_decompress((uchar*)c_buf, ucthread->c_len, (uchar*)ucthread->s_buf, &dlen, NULL);
591 if (unlikely(lzerr != LZO_E_OK)) {
592 print_err("Failed to decompress buffer - lzerr=%d\n", lzerr);
593 free(ucthread->s_buf);
594 ucthread->s_buf = c_buf;
595 ret = -1;
596 goto out;
597 }
598
599 if (unlikely((i64)dlen != ucthread->u_len)) {
600 print_err("Inconsistent length after decompression. Got %lu bytes, expected %lld\n", (unsigned long)dlen, ucthread->u_len);
601 ret = -1;
602 }
603
604 free(c_buf);
605 out:
606 if (ret == -1)
607 ucthread->s_buf = c_buf;
608 return ret;
609 }
610
611 /* WORK FUNCTIONS */
612
613 /* Look at whether we're writing to a ram location or physical files and write
614 * the data accordingly. */
put_fdout(rzip_control * control,void * offset_buf,ssize_t ret)615 ssize_t put_fdout(rzip_control *control, void *offset_buf, ssize_t ret)
616 {
617 if (!TMP_OUTBUF)
618 return write(control->fd_out, offset_buf, (size_t)ret);
619
620 if (unlikely(control->out_ofs + ret > control->out_maxlen)) {
621 /* The data won't fit in a temporary output buffer so we have
622 * to fall back to temporary files. */
623 print_verbose("Unable to decompress entirely in ram, will use physical files\n");
624 if (unlikely(control->fd_out == -1))
625 failure("Was unable to decompress entirely in ram and no temporary file creation was possible\n");
626 if (unlikely(!write_fdout(control, control->tmp_outbuf, control->out_len))) {
627 print_err("Unable to write_fdout tmpoutbuf in put_fdout\n");
628 return -1;
629 }
630 close_tmpoutbuf(control);
631 if (unlikely(!write_fdout(control, offset_buf, ret))) {
632 print_err("Unable to write_fdout offset_buf in put_fdout\n");
633 return -1;
634 }
635 return ret;
636 }
637 memcpy(control->tmp_outbuf + control->out_ofs, offset_buf, ret);
638 control->out_ofs += ret;
639 if (likely(control->out_ofs > control->out_len))
640 control->out_len = control->out_ofs;
641 return ret;
642 }
643
644 /* This is a custom version of write() which writes in 1GB chunks to avoid
645 the overflows at the >= 2GB mark thanks to 32bit fuckage. This should help
646 even on the rare occasion write() fails to write 1GB as well. */
write_1g(rzip_control * control,void * buf,i64 len)647 ssize_t write_1g(rzip_control *control, void *buf, i64 len)
648 {
649 uchar *offset_buf = buf;
650 ssize_t ret;
651 i64 total;
652
653 total = 0;
654 while (len > 0) {
655 ret = MIN(len, one_g);
656 ret = put_fdout(control, offset_buf, (size_t)ret);
657 if (unlikely(ret <= 0))
658 return ret;
659 len -= ret;
660 offset_buf += ret;
661 total += ret;
662 }
663 return total;
664 }
665
666 /* Should be called only if we know the buffer will be large enough, otherwise
667 * we must dump_stdin first */
read_fdin(struct rzip_control * control,i64 len)668 static bool read_fdin(struct rzip_control *control, i64 len)
669 {
670 int tmpchar;
671 i64 i;
672
673 for (i = 0; i < len; i++) {
674 tmpchar = getchar();
675 if (unlikely(tmpchar == EOF))
676 failure_return(("Reached end of file on STDIN prematurely on read_fdin, asked for %lld got %lld\n",
677 len, i), false);
678 control->tmp_inbuf[control->in_ofs + i] = (char)tmpchar;
679 }
680 control->in_len = control->in_ofs + len;
681 return true;
682 }
683
684 /* Dump STDIN into a temporary file */
dump_stdin(rzip_control * control)685 static int dump_stdin(rzip_control *control)
686 {
687 if (unlikely(!write_fdin(control)))
688 return -1;
689 if (unlikely(!read_tmpinfile(control, control->fd_in)))
690 return -1;
691 close_tmpinbuf(control);
692 return 0;
693 }
694
695 /* Ditto for read */
read_1g(rzip_control * control,int fd,void * buf,i64 len)696 ssize_t read_1g(rzip_control *control, int fd, void *buf, i64 len)
697 {
698 uchar *offset_buf = buf;
699 ssize_t ret;
700 i64 total;
701
702 if (TMP_INBUF && fd == control->fd_in) {
703 /* We're decompressing from STDIN */
704 if (unlikely(control->in_ofs + len > control->in_maxlen)) {
705 /* We're unable to fit it all into the temp buffer */
706 if (dump_stdin(control))
707 failure_return(("Inadequate ram to %compress from STDIN and unable to create in tmpfile"), -1);
708 goto read_fd;
709 }
710 if (control->in_ofs + len > control->in_len) {
711 if (unlikely(!read_fdin(control, control->in_ofs + len - control->in_len)))
712 return false;
713 }
714 memcpy(buf, control->tmp_inbuf + control->in_ofs, len);
715 control->in_ofs += len;
716 return len;
717 }
718
719 if (TMP_OUTBUF && fd == control->fd_out) {
720 if (unlikely(control->out_ofs + len > control->out_maxlen))
721 failure_return(("Trying to read beyond out_ofs in tmpoutbuf\n"), -1);
722 memcpy(buf, control->tmp_outbuf + control->out_ofs, len);
723 control->out_ofs += len;
724 return len;
725 }
726
727 read_fd:
728 total = 0;
729 while (len > 0) {
730 ret = MIN(len, one_g);
731 ret = read(fd, offset_buf, (size_t)ret);
732 if (unlikely(ret <= 0))
733 return ret;
734 len -= ret;
735 offset_buf += ret;
736 total += ret;
737 }
738 return total;
739 }
740
741 /* write to a file, return 0 on success and -1 on failure */
write_buf(rzip_control * control,uchar * p,i64 len)742 static int write_buf(rzip_control *control, uchar *p, i64 len)
743 {
744 ssize_t ret;
745
746 ret = write_1g(control, p, (size_t)len);
747 if (unlikely(ret == -1)) {
748 print_err("Write of length %lld failed - %s\n", len, strerror(errno));
749 return -1;
750 }
751 if (unlikely(ret != (ssize_t)len)) {
752 print_err("Partial write!? asked for %lld bytes but got %lld\n", len, (i64)ret);
753 return -1;
754 }
755 return 0;
756 }
757
758 /* write a byte */
write_u8(rzip_control * control,uchar v)759 static inline int write_u8(rzip_control *control, uchar v)
760 {
761 return write_buf(control, &v, 1);
762 }
763
write_val(rzip_control * control,i64 v,int len)764 static inline int write_val(rzip_control *control, i64 v, int len)
765 {
766 v = htole64(v);
767 return write_buf(control, (uchar *)&v, len);
768 }
769
read_buf(rzip_control * control,int f,uchar * p,i64 len)770 static int read_buf(rzip_control *control, int f, uchar *p, i64 len)
771 {
772 ssize_t ret;
773
774 ret = read_1g(control, f, p, (size_t)len);
775 if (unlikely(ret == -1)) {
776 print_err("Read of length %lld failed - %s\n", len, strerror(errno));
777 return -1;
778 }
779 if (unlikely(ret != (ssize_t)len)) {
780 print_err("Partial read!? asked for %lld bytes but got %lld\n", len, (i64)ret);
781 return -1;
782 }
783 return 0;
784 }
785
read_u8(rzip_control * control,int f,uchar * v)786 static inline int read_u8(rzip_control *control, int f, uchar *v)
787 {
788 return read_buf(control, f, v, 1);
789 }
790
read_u32(rzip_control * control,int f,u32 * v)791 static inline int read_u32(rzip_control *control, int f, u32 *v)
792 {
793 int ret = read_buf(control, f, (uchar *)v, 4);
794
795 *v = le32toh(*v);
796 return ret;
797 }
798
read_val(rzip_control * control,int f,i64 * v,int len)799 static inline int read_val(rzip_control *control, int f, i64 *v, int len)
800 {
801 int ret;
802
803 /* We only partially read all 8 bytes so have to zero v here */
804 *v = 0;
805 ret = read_buf(control, f, (uchar *)v, len);
806 return ret;
807 }
808
fd_seekto(rzip_control * control,struct stream_info * sinfo,i64 spos,i64 pos)809 static int fd_seekto(rzip_control *control, struct stream_info *sinfo, i64 spos, i64 pos)
810 {
811 if (unlikely(lseek(sinfo->fd, spos, SEEK_SET) != spos)) {
812 print_err("Failed to seek to %lld in stream\n", pos);
813 return -1;
814 }
815 return 0;
816 }
817
818 /* seek to a position within a set of streams - return -1 on failure */
seekto(rzip_control * control,struct stream_info * sinfo,i64 pos)819 static int seekto(rzip_control *control, struct stream_info *sinfo, i64 pos)
820 {
821 i64 spos = pos + sinfo->initial_pos;
822
823 if (TMP_OUTBUF) {
824 spos -= control->out_relofs;
825 control->out_ofs = spos;
826 if (unlikely(spos > control->out_len || spos < 0)) {
827 print_err("Trying to seek to %lld outside tmp outbuf in seekto\n", spos);
828 return -1;
829 }
830 return 0;
831 }
832
833 return fd_seekto(control, sinfo, spos, pos);
834 }
835
read_seekto(rzip_control * control,struct stream_info * sinfo,i64 pos)836 static int read_seekto(rzip_control *control, struct stream_info *sinfo, i64 pos)
837 {
838 i64 spos = pos + sinfo->initial_pos;
839
840 if (TMP_INBUF) {
841 if (spos > control->in_len) {
842 i64 len = spos - control->in_len;
843
844 if (control->in_ofs + len > control->in_maxlen) {
845 if (unlikely(dump_stdin(control)))
846 return -1;
847 goto fd_seek;
848 } else {
849 if (unlikely(!read_fdin(control, len)))
850 return -1;
851 }
852 }
853 control->in_ofs = spos;
854 if (unlikely(spos < 0)) {
855 print_err("Trying to seek to %lld outside tmp inbuf in read_seekto\n", spos);
856 return -1;
857 }
858 return 0;
859 }
860 fd_seek:
861 return fd_seekto(control, sinfo, spos, pos);
862 }
863
get_seek(rzip_control * control,int fd)864 static i64 get_seek(rzip_control *control, int fd)
865 {
866 i64 ret;
867
868 if (TMP_OUTBUF)
869 return control->out_relofs + control->out_ofs;
870 ret = lseek(fd, 0, SEEK_CUR);
871 if (unlikely(ret == -1))
872 fatal_return(("Failed to lseek in get_seek\n"), -1);
873 return ret;
874 }
875
get_readseek(rzip_control * control,int fd)876 i64 get_readseek(rzip_control *control, int fd)
877 {
878 i64 ret;
879
880 if (TMP_INBUF)
881 return control->in_ofs;
882 ret = lseek(fd, 0, SEEK_CUR);
883 if (unlikely(ret == -1))
884 fatal_return(("Failed to lseek in get_seek\n"), -1);
885 return ret;
886 }
887
prepare_streamout_threads(rzip_control * control)888 bool prepare_streamout_threads(rzip_control *control)
889 {
890 int i;
891
892 /* As we serialise the generation of threads during the rzip
893 * pre-processing stage, it's faster to have one more thread available
894 * to keep all CPUs busy. There is no point splitting up the chunks
895 * into multiple threads if there will be no compression back end. */
896 if (control->threads > 1)
897 ++control->threads;
898 if (NO_COMPRESS)
899 control->threads = 1;
900 threads = calloc(sizeof(pthread_t), control->threads);
901 if (unlikely(!threads))
902 fatal_return(("Unable to calloc threads in prepare_streamout_threads\n"), false);
903
904 cthread = calloc(sizeof(struct compress_thread), control->threads);
905 if (unlikely(!cthread)) {
906 free(threads);
907 fatal_return(("Unable to calloc cthread in prepare_streamout_threads\n"), false);
908 }
909
910 for (i = 0; i < control->threads; i++) {
911 cksem_init(control, &cthread[i].cksem);
912 cksem_post(control, &cthread[i].cksem);
913 }
914 return true;
915 }
916
917
close_streamout_threads(rzip_control * control)918 bool close_streamout_threads(rzip_control *control)
919 {
920 int i, close_thread = output_thread;
921
922 /* Wait for the threads in the correct order in case they end up
923 * serialised */
924 for (i = 0; i < control->threads; i++) {
925 cksem_wait(control, &cthread[close_thread].cksem);
926
927 if (++close_thread == control->threads)
928 close_thread = 0;
929 }
930 free(cthread);
931 free(threads);
932 return true;
933 }
934
935 /* open a set of output streams, compressing with the given
936 compression level and algorithm */
open_stream_out(rzip_control * control,int f,unsigned int n,i64 chunk_limit,char cbytes)937 void *open_stream_out(rzip_control *control, int f, unsigned int n, i64 chunk_limit, char cbytes)
938 {
939 struct stream_info *sinfo;
940 i64 testsize, limit;
941 uchar *testmalloc;
942 unsigned int i, testbufs;
943
944 sinfo = calloc(sizeof(struct stream_info), 1);
945 if (unlikely(!sinfo))
946 return NULL;
947 if (chunk_limit < control->page_size)
948 chunk_limit = control->page_size;
949 sinfo->bufsize = sinfo->size = limit = chunk_limit;
950
951 sinfo->chunk_bytes = cbytes;
952 sinfo->num_streams = n;
953 sinfo->fd = f;
954
955 sinfo->s = calloc(sizeof(struct stream), n);
956 if (unlikely(!sinfo->s)) {
957 free(sinfo);
958 return NULL;
959 }
960
961 /* Find the largest we can make the window based on ability to malloc
962 * ram. We need 2 buffers for each compression thread and the overhead
963 * of each compression back end. No 2nd buf is required when there is
964 * no back end compression. We limit the total regardless to 1/3 ram
965 * for when the OS lies due to heavy overcommit. */
966 if (NO_COMPRESS)
967 testbufs = 1;
968 else
969 testbufs = 2;
970
971 testsize = (limit * testbufs) + (control->overhead * control->threads);
972 if (testsize > control->usable_ram)
973 limit = (control->usable_ram - (control->overhead * control->threads)) / testbufs;
974
975 /* If we don't have enough ram for the number of threads, decrease the
976 * number of threads till we do, or only have one thread. */
977 while (limit < STREAM_BUFSIZE && limit < chunk_limit) {
978 if (control->threads > 1)
979 --control->threads;
980 else
981 break;
982 limit = (control->usable_ram - (control->overhead * control->threads)) / testbufs;
983 limit = MIN(limit, chunk_limit);
984 }
985 if (BITS32) {
986 limit = MIN(limit, one_g);
987 if (limit + (control->overhead * control->threads) > one_g)
988 limit = one_g - (control->overhead * control->threads);
989 }
990 /* Use a nominal minimum size should we fail all previous shrinking */
991 limit = MAX(limit, STREAM_BUFSIZE);
992 limit = MIN(limit, chunk_limit);
993 retest_malloc:
994 testsize = limit + (control->overhead * control->threads);
995 testmalloc = malloc(testsize);
996 if (!testmalloc) {
997 limit = limit / 10 * 9;
998 goto retest_malloc;
999 }
1000 if (!NO_COMPRESS) {
1001 char *testmalloc2 = malloc(limit);
1002
1003 if (!testmalloc2) {
1004 free(testmalloc);
1005 limit = limit / 10 * 9;
1006 goto retest_malloc;
1007 }
1008 free(testmalloc2);
1009 }
1010 free(testmalloc);
1011 print_maxverbose("Succeeded in testing %lld sized malloc for back end compression\n", testsize);
1012
1013 /* Make the bufsize no smaller than STREAM_BUFSIZE. Round up the
1014 * bufsize to fit X threads into it */
1015 sinfo->bufsize = MIN(limit, MAX((limit + control->threads - 1) / control->threads,
1016 STREAM_BUFSIZE));
1017
1018 if (control->threads > 1)
1019 print_maxverbose("Using up to %d threads to compress up to %lld bytes each.\n",
1020 control->threads, sinfo->bufsize);
1021 else
1022 print_maxverbose("Using only 1 thread to compress up to %lld bytes\n",
1023 sinfo->bufsize);
1024
1025 for (i = 0; i < n; i++) {
1026 sinfo->s[i].buf = calloc(sinfo->bufsize , 1);
1027 if (unlikely(!sinfo->s[i].buf)) {
1028 fatal("Unable to malloc buffer of size %lld in open_stream_out\n", sinfo->bufsize);
1029 free(sinfo->s);
1030 free(sinfo);
1031 return NULL;
1032 }
1033 }
1034
1035 return (void *)sinfo;
1036 }
1037
1038 /* The block headers are all encrypted so we read the data and salt associated
1039 * with them, decrypt the data, then return the decrypted version of the
1040 * values */
decrypt_header(rzip_control * control,uchar * head,uchar * c_type,i64 * c_len,i64 * u_len,i64 * last_head)1041 static bool decrypt_header(rzip_control *control, uchar *head, uchar *c_type,
1042 i64 *c_len, i64 *u_len, i64 *last_head)
1043 {
1044 uchar *buf = head + SALT_LEN;
1045
1046 memcpy(buf, c_type, 1);
1047 memcpy(buf + 1, c_len, 8);
1048 memcpy(buf + 9, u_len, 8);
1049 memcpy(buf + 17, last_head, 8);
1050
1051 if (unlikely(!lrz_decrypt(control, buf, 25, head)))
1052 return false;
1053
1054 memcpy(c_type, buf, 1);
1055 memcpy(c_len, buf + 1, 8);
1056 memcpy(u_len, buf + 9, 8);
1057 memcpy(last_head, buf + 17, 8);
1058 return true;
1059 }
1060
1061 /* prepare a set of n streams for reading on file descriptor f */
open_stream_in(rzip_control * control,int f,int n,char chunk_bytes)1062 void *open_stream_in(rzip_control *control, int f, int n, char chunk_bytes)
1063 {
1064 struct stream_info *sinfo;
1065 int total_threads, i;
1066 i64 header_length;
1067
1068 sinfo = calloc(sizeof(struct stream_info), 1);
1069 if (unlikely(!sinfo))
1070 return NULL;
1071
1072 /* We have one thread dedicated to stream 0, and one more thread than
1073 * CPUs to keep them busy, unless we're running single-threaded. */
1074 if (control->threads > 1)
1075 total_threads = control->threads + 2;
1076 else
1077 total_threads = control->threads + 1;
1078 threads = calloc(sizeof(pthread_t), total_threads);
1079 if (unlikely(!threads))
1080 return NULL;
1081
1082 ucthread = calloc(sizeof(struct uncomp_thread), total_threads);
1083 if (unlikely(!ucthread)) {
1084 free(sinfo);
1085 free(threads);
1086 fatal_return(("Unable to calloc cthread in open_stream_in\n"), NULL);
1087 }
1088
1089 sinfo->num_streams = n;
1090 sinfo->fd = f;
1091 sinfo->chunk_bytes = chunk_bytes;
1092
1093 sinfo->s = calloc(sizeof(struct stream), n);
1094 if (unlikely(!sinfo->s)) {
1095 free(sinfo);
1096 return NULL;
1097 }
1098
1099 sinfo->s[0].total_threads = 1;
1100 sinfo->s[1].total_threads = total_threads - 1;
1101
1102 if (control->major_version == 0 && control->minor_version > 5) {
1103 /* Read in flag that tells us if there are more chunks after
1104 * this. Ignored if we know the final file size */
1105 print_maxverbose("Reading eof flag at %lld\n", get_readseek(control, f));
1106 if (unlikely(read_u8(control, f, &control->eof))) {
1107 print_err("Failed to read eof flag in open_stream_in\n");
1108 goto failed;
1109 }
1110 print_maxverbose("EOF: %d\n", control->eof);
1111
1112 /* Read in the expected chunk size */
1113 if (!ENCRYPT) {
1114 print_maxverbose("Reading expected chunksize at %lld\n", get_readseek(control, f));
1115 if (unlikely(read_val(control, f, &sinfo->size, sinfo->chunk_bytes))) {
1116 print_err("Failed to read in chunk size in open_stream_in\n");
1117 goto failed;
1118 }
1119 sinfo->size = le64toh(sinfo->size);
1120 print_maxverbose("Chunk size: %lld\n", sinfo->size);
1121 control->st_size += sinfo->size;
1122 }
1123 }
1124 sinfo->initial_pos = get_readseek(control, f);
1125 if (unlikely(sinfo->initial_pos == -1))
1126 goto failed;
1127
1128 for (i = 0; i < n; i++) {
1129 uchar c, enc_head[25 + SALT_LEN];
1130 i64 v1, v2;
1131
1132 sinfo->s[i].base_thread = i;
1133 sinfo->s[i].uthread_no = sinfo->s[i].base_thread;
1134 sinfo->s[i].unext_thread = sinfo->s[i].base_thread;
1135
1136 if (unlikely(ENCRYPT && read_buf(control, f, enc_head, SALT_LEN)))
1137 goto failed;
1138 again:
1139 if (unlikely(read_u8(control, f, &c)))
1140 goto failed;
1141
1142 /* Compatibility crap for versions < 0.40 */
1143 if (control->major_version == 0 && control->minor_version < 4) {
1144 u32 v132, v232, last_head32;
1145
1146 if (unlikely(read_u32(control, f, &v132)))
1147 goto failed;
1148 if (unlikely(read_u32(control, f, &v232)))
1149 goto failed;
1150 if (unlikely(read_u32(control, f, &last_head32)))
1151 goto failed;
1152
1153 v1 = v132;
1154 v2 = v232;
1155 sinfo->s[i].last_head = last_head32;
1156 header_length = 13;
1157 } else {
1158 int read_len;
1159
1160 print_maxverbose("Reading stream %d header at %lld\n", i, get_readseek(control, f));
1161 if ((control->major_version == 0 && control->minor_version < 6) ||
1162 ENCRYPT)
1163 read_len = 8;
1164 else
1165 read_len = sinfo->chunk_bytes;
1166 if (unlikely(read_val(control, f, &v1, read_len)))
1167 goto failed;
1168 if (unlikely(read_val(control, f, &v2, read_len)))
1169 goto failed;
1170 if (unlikely(read_val(control, f, &sinfo->s[i].last_head, read_len)))
1171 goto failed;
1172 header_length = 1 + (read_len * 3);
1173 }
1174 sinfo->total_read += header_length;
1175
1176 if (ENCRYPT) {
1177 if (unlikely(!decrypt_header(control, enc_head, &c, &v1, &v2, &sinfo->s[i].last_head)))
1178 goto failed;
1179 sinfo->total_read += SALT_LEN;
1180 }
1181
1182 v1 = le64toh(v1);
1183 v2 = le64toh(v2);
1184 sinfo->s[i].last_head = le64toh(sinfo->s[i].last_head);
1185
1186 if (unlikely(c == CTYPE_NONE && v1 == 0 && v2 == 0 && sinfo->s[i].last_head == 0 && i == 0)) {
1187 print_err("Enabling stream close workaround\n");
1188 sinfo->initial_pos += header_length;
1189 goto again;
1190 }
1191
1192 if (unlikely(c != CTYPE_NONE)) {
1193 print_err("Unexpected initial tag %d in streams\n", c);
1194 if (ENCRYPT)
1195 print_err("Wrong password?\n");
1196 goto failed;
1197 }
1198 if (unlikely(v1)) {
1199 print_err("Unexpected initial c_len %lld in streams %lld\n", v1, v2);
1200 goto failed;
1201 }
1202 if (unlikely(v2)) {
1203 print_err("Unexpected initial u_len %lld in streams\n", v2);
1204 goto failed;
1205 }
1206 }
1207
1208 return (void *)sinfo;
1209
1210 failed:
1211 free(sinfo->s);
1212 free(sinfo);
1213 return NULL;
1214 }
1215
1216 #define MIN_SIZE (ENCRYPT ? CBC_LEN : 0)
1217
1218 /* Once the final data has all been written to the block header, we go back
1219 * and write SALT_LEN bytes of salt before it, and encrypt the header in place
1220 * by reading what has been written, encrypting it, and writing back over it.
1221 * This is very convoluted depending on whether a last_head value is written
1222 * to this block or not. See the callers of this function */
rewrite_encrypted(rzip_control * control,struct stream_info * sinfo,i64 ofs)1223 static bool rewrite_encrypted(rzip_control *control, struct stream_info *sinfo, i64 ofs)
1224 {
1225 uchar *buf, *head;
1226 i64 cur_ofs;
1227
1228 cur_ofs = get_seek(control, sinfo->fd) - sinfo->initial_pos;
1229 if (unlikely(cur_ofs == -1))
1230 return false;
1231 head = malloc(25 + SALT_LEN);
1232 if (unlikely(!head))
1233 fatal_return(("Failed to malloc head in rewrite_encrypted\n"), false);
1234 buf = head + SALT_LEN;
1235 if (unlikely(!get_rand(control, head, SALT_LEN)))
1236 goto error;
1237 if (unlikely(seekto(control, sinfo, ofs - SALT_LEN)))
1238 failure_goto(("Failed to seekto buf ofs in rewrite_encrypted\n"), error);
1239 if (unlikely(write_buf(control, head, SALT_LEN)))
1240 failure_goto(("Failed to write_buf head in rewrite_encrypted\n"), error);
1241 if (unlikely(read_buf(control, sinfo->fd, buf, 25)))
1242 failure_goto(("Failed to read_buf buf in rewrite_encrypted\n"), error);
1243
1244 if (unlikely(!lrz_encrypt(control, buf, 25, head)))
1245 goto error;
1246
1247 if (unlikely(seekto(control, sinfo, ofs)))
1248 failure_goto(("Failed to seek back to ofs in rewrite_encrypted\n"), error);
1249 if (unlikely(write_buf(control, buf, 25)))
1250 failure_goto(("Failed to write_buf encrypted buf in rewrite_encrypted\n"), error);
1251 free(head);
1252 seekto(control, sinfo, cur_ofs);
1253 return true;
1254 error:
1255 free(head);
1256 return false;
1257 }
1258
1259 /* Enter with s_buf allocated,s_buf points to the compressed data after the
1260 * backend compression and is then freed here */
compthread(void * data)1261 static void *compthread(void *data)
1262 {
1263 stream_thread_struct *s = data;
1264 rzip_control *control = s->control;
1265 long i = s->i;
1266 struct compress_thread *cti;
1267 struct stream_info *ctis;
1268 int waited = 0, ret = 0;
1269 i64 padded_len;
1270 int write_len;
1271
1272 /* Make sure this thread doesn't already exist */
1273
1274 free(data);
1275 cti = &cthread[i];
1276 ctis = cti->sinfo;
1277
1278 if (unlikely(setpriority(PRIO_PROCESS, 0, control->nice_val) == -1))
1279 print_err("Warning, unable to set nice value on thread\n");
1280
1281 cti->c_type = CTYPE_NONE;
1282 cti->c_len = cti->s_len;
1283
1284 /* Flushing writes to disk frees up any dirty ram, improving chances
1285 * of succeeding in allocating more ram */
1286 fsync(ctis->fd);
1287 retry:
1288 /* Very small buffers have issues to do with minimum amounts of ram
1289 * allocatable to a buffer combined with the MINIMUM_MATCH of rzip
1290 * being 31 bytes so don't bother trying to compress anything less
1291 * than 64 bytes. */
1292 if (!NO_COMPRESS && cti->c_len >= 64) {
1293 if (LZMA_COMPRESS)
1294 ret = lzma_compress_buf(control, cti);
1295 else if (LZO_COMPRESS)
1296 ret = lzo_compress_buf(control, cti);
1297 else if (BZIP2_COMPRESS)
1298 ret = bzip2_compress_buf(control, cti);
1299 else if (ZLIB_COMPRESS)
1300 ret = gzip_compress_buf(control, cti);
1301 else if (ZPAQ_COMPRESS)
1302 ret = zpaq_compress_buf(control, cti, i);
1303 else failure_goto(("Dunno wtf compression to use!\n"), error);
1304 }
1305
1306 padded_len = cti->c_len;
1307 if (!ret && padded_len < MIN_SIZE) {
1308 /* We need to pad out each block to at least be CBC_LEN bytes
1309 * long or encryption cannot work. We pad it with random
1310 * data */
1311 padded_len = MIN_SIZE;
1312 cti->s_buf = realloc(cti->s_buf, MIN_SIZE);
1313 if (unlikely(!cti->s_buf))
1314 fatal_goto(("Failed to realloc s_buf in compthread\n"), error);
1315 if (unlikely(!get_rand(control, cti->s_buf + cti->c_len, MIN_SIZE - cti->c_len)))
1316 goto error;
1317 }
1318
1319 /* If compression fails for whatever reason multithreaded, then wait
1320 * for the previous thread to finish, serialising the work to decrease
1321 * the memory requirements, increasing the chance of success */
1322 if (unlikely(ret && waited))
1323 failure_goto(("Failed to compress in compthread\n"), error);
1324
1325 if (!waited) {
1326 lock_mutex(control, &output_lock);
1327 while (output_thread != i)
1328 cond_wait(control, &output_cond, &output_lock);
1329 unlock_mutex(control, &output_lock);
1330 waited = 1;
1331 }
1332 if (unlikely(ret)) {
1333 print_maxverbose("Unable to compress in parallel, waiting for previous thread to complete before trying again\n");
1334 goto retry;
1335 }
1336
1337 /* Need to be big enough to fill one CBC_LEN */
1338 if (ENCRYPT)
1339 write_len = 8;
1340 else
1341 write_len = ctis->chunk_bytes;
1342
1343 if (!ctis->chunks++) {
1344 int j;
1345
1346 if (TMP_OUTBUF) {
1347 lock_mutex(control, &control->control_lock);
1348 if (!control->magic_written)
1349 write_magic(control);
1350 unlock_mutex(control, &control->control_lock);
1351
1352 if (unlikely(!flush_tmpoutbuf(control))) {
1353 print_err("Failed to flush_tmpoutbuf in compthread\n");
1354 goto error;
1355 }
1356 }
1357
1358 print_maxverbose("Writing initial chunk bytes value %d at %lld\n",
1359 ctis->chunk_bytes, get_seek(control, ctis->fd));
1360 /* Write chunk bytes of this block */
1361 write_u8(control, ctis->chunk_bytes);
1362
1363 /* Write whether this is the last chunk, followed by the size
1364 * of this chunk */
1365 print_maxverbose("Writing EOF flag as %d\n", control->eof);
1366 write_u8(control, control->eof);
1367 if (!ENCRYPT)
1368 write_val(control, ctis->size, ctis->chunk_bytes);
1369
1370 /* First chunk of this stream, write headers */
1371 ctis->initial_pos = get_seek(control, ctis->fd);
1372 if (unlikely(ctis->initial_pos == -1))
1373 goto error;
1374
1375 print_maxverbose("Writing initial header at %lld\n", ctis->initial_pos);
1376 for (j = 0; j < ctis->num_streams; j++) {
1377 /* If encrypting, we leave SALT_LEN room to write in salt
1378 * later */
1379 if (ENCRYPT) {
1380 if (unlikely(write_val(control, 0, SALT_LEN)))
1381 fatal_goto(("Failed to write_buf blank salt in compthread %d\n", i), error);
1382 ctis->cur_pos += SALT_LEN;
1383 }
1384 ctis->s[j].last_head = ctis->cur_pos + 1 + (write_len * 2);
1385 write_u8(control, CTYPE_NONE);
1386 write_val(control, 0, write_len);
1387 write_val(control, 0, write_len);
1388 write_val(control, 0, write_len);
1389 ctis->cur_pos += 1 + (write_len * 3);
1390 }
1391 }
1392
1393 print_maxverbose("Compthread %ld seeking to %lld to store length %d\n", i, ctis->s[cti->streamno].last_head, write_len);
1394
1395 if (unlikely(seekto(control, ctis, ctis->s[cti->streamno].last_head)))
1396 fatal_goto(("Failed to seekto in compthread %d\n", i), error);
1397
1398 if (unlikely(write_val(control, ctis->cur_pos, write_len)))
1399 fatal_goto(("Failed to write_val cur_pos in compthread %d\n", i), error);
1400
1401 if (ENCRYPT)
1402 rewrite_encrypted(control, ctis, ctis->s[cti->streamno].last_head - 17);
1403
1404 ctis->s[cti->streamno].last_head = ctis->cur_pos + 1 + (write_len * 2) + (ENCRYPT ? SALT_LEN : 0);
1405
1406 print_maxverbose("Compthread %ld seeking to %lld to write header\n", i, ctis->cur_pos);
1407
1408 if (unlikely(seekto(control, ctis, ctis->cur_pos)))
1409 fatal_goto(("Failed to seekto cur_pos in compthread %d\n", i), error);
1410
1411 print_maxverbose("Thread %ld writing %lld compressed bytes from stream %d\n", i, padded_len, cti->streamno);
1412
1413 if (ENCRYPT) {
1414 if (unlikely(write_val(control, 0, SALT_LEN)))
1415 fatal_goto(("Failed to write_buf header salt in compthread %d\n", i), error);
1416 ctis->cur_pos += SALT_LEN;
1417 ctis->s[cti->streamno].last_headofs = ctis->cur_pos;
1418 }
1419 /* We store the actual c_len even though we might pad it out */
1420 if (unlikely(write_u8(control, cti->c_type) ||
1421 write_val(control, cti->c_len, write_len) ||
1422 write_val(control, cti->s_len, write_len) ||
1423 write_val(control, 0, write_len))) {
1424 fatal_goto(("Failed write in compthread %d\n", i), error);
1425 }
1426 ctis->cur_pos += 1 + (write_len * 3);
1427
1428 if (ENCRYPT) {
1429 if (unlikely(!get_rand(control, cti->salt, SALT_LEN)))
1430 goto error;
1431 if (unlikely(write_buf(control, cti->salt, SALT_LEN)))
1432 fatal_goto(("Failed to write_buf block salt in compthread %d\n", i), error);
1433 if (unlikely(!lrz_encrypt(control, cti->s_buf, padded_len, cti->salt)))
1434 goto error;
1435 ctis->cur_pos += SALT_LEN;
1436 }
1437
1438 print_maxverbose("Compthread %ld writing data at %lld\n", i, ctis->cur_pos);
1439
1440 if (unlikely(write_buf(control, cti->s_buf, padded_len)))
1441 fatal_goto(("Failed to write_buf s_buf in compthread %d\n", i), error);
1442
1443 ctis->cur_pos += padded_len;
1444 free(cti->s_buf);
1445
1446 lock_mutex(control, &output_lock);
1447 if (++output_thread == control->threads)
1448 output_thread = 0;
1449 cond_broadcast(control, &output_cond);
1450 unlock_mutex(control, &output_lock);
1451
1452 error:
1453 cksem_post(control, &cti->cksem);
1454
1455 return NULL;
1456 }
1457
clear_buffer(rzip_control * control,struct stream_info * sinfo,int streamno,int newbuf)1458 static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int streamno, int newbuf)
1459 {
1460 stream_thread_struct *s;
1461 static int i = 0;
1462
1463 /* Make sure this thread doesn't already exist */
1464 cksem_wait(control, &cthread[i].cksem);
1465
1466 cthread[i].sinfo = sinfo;
1467 cthread[i].streamno = streamno;
1468 cthread[i].s_buf = sinfo->s[streamno].buf;
1469 cthread[i].s_len = sinfo->s[streamno].buflen;
1470
1471 print_maxverbose("Starting thread %ld to compress %lld bytes from stream %d\n",
1472 i, cthread[i].s_len, streamno);
1473
1474 s = malloc(sizeof(stream_thread_struct));
1475 if (unlikely(!s)) {
1476 cksem_post(control, &cthread[i].cksem);
1477 failure("Unable to malloc in clear_buffer");
1478 }
1479 s->i = i;
1480 s->control = control;
1481 if (unlikely((!create_pthread(control, &threads[i], NULL, compthread, s)) ||
1482 (!detach_pthread(control, &threads[i]))))
1483 failure("Unable to create compthread in clear_buffer");
1484
1485 if (newbuf) {
1486 /* The stream buffer has been given to the thread, allocate a
1487 * new one. */
1488 sinfo->s[streamno].buf = malloc(sinfo->bufsize);
1489 if (unlikely(!sinfo->s[streamno].buf))
1490 failure("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize);
1491 sinfo->s[streamno].buflen = 0;
1492 }
1493
1494 if (++i == control->threads)
1495 i = 0;
1496 }
1497
1498 /* flush out any data in a stream buffer */
flush_buffer(rzip_control * control,struct stream_info * sinfo,int streamno)1499 void flush_buffer(rzip_control *control, struct stream_info *sinfo, int streamno)
1500 {
1501 clear_buffer(control, sinfo, streamno, 1);
1502 }
1503
ucompthread(void * data)1504 static void *ucompthread(void *data)
1505 {
1506 stream_thread_struct *s = data;
1507 rzip_control *control = s->control;
1508 long i = s->i;
1509 struct uncomp_thread *uci;
1510 int waited = 0, ret = 0;
1511
1512 free(data);
1513 uci = &ucthread[i];
1514
1515 if (unlikely(setpriority(PRIO_PROCESS, 0, control->nice_val) == -1))
1516 print_err("Warning, unable to set nice value on thread\n");
1517
1518 retry:
1519 if (uci->c_type != CTYPE_NONE) {
1520 switch (uci->c_type) {
1521 case CTYPE_LZMA:
1522 ret = lzma_decompress_buf(control, uci);
1523 break;
1524 case CTYPE_LZO:
1525 ret = lzo_decompress_buf(control, uci);
1526 break;
1527 case CTYPE_BZIP2:
1528 ret = bzip2_decompress_buf(control, uci);
1529 break;
1530 case CTYPE_GZIP:
1531 ret = gzip_decompress_buf(control, uci);
1532 break;
1533 case CTYPE_ZPAQ:
1534 ret = zpaq_decompress_buf(control, uci, i);
1535 break;
1536 default:
1537 failure_return(("Dunno wtf decompression type to use!\n"), NULL);
1538 break;
1539 }
1540 }
1541
1542 /* As per compression, serialise the decompression if it fails in
1543 * parallel */
1544 if (unlikely(ret)) {
1545 if (unlikely(waited))
1546 failure_return(("Failed to decompress in ucompthread\n"), NULL);
1547 print_maxverbose("Unable to decompress in parallel, waiting for previous thread to complete before trying again\n");
1548 /* We do not strictly need to wait for this, so it's used when
1549 * decompression fails due to inadequate memory to try again
1550 * serialised. */
1551 lock_mutex(control, &output_lock);
1552 while (output_thread != i)
1553 cond_wait(control, &output_cond, &output_lock);
1554 unlock_mutex(control, &output_lock);
1555 waited = 1;
1556 goto retry;
1557 }
1558
1559 print_maxverbose("Thread %ld decompressed %lld bytes from stream %d\n", i, uci->u_len, uci->streamno);
1560
1561 return 0;
1562 }
1563
1564 /* fill a buffer from a stream - return -1 on failure */
fill_buffer(rzip_control * control,struct stream_info * sinfo,int streamno)1565 static int fill_buffer(rzip_control *control, struct stream_info *sinfo, int streamno)
1566 {
1567 i64 u_len, c_len, last_head, padded_len, header_length;
1568 uchar enc_head[25 + SALT_LEN], blocksalt[SALT_LEN];
1569 struct stream *s = &sinfo->s[streamno];
1570 stream_thread_struct *st;
1571 uchar c_type, *s_buf;
1572
1573 if (s->buf)
1574 free(s->buf);
1575 if (s->eos)
1576 goto out;
1577 fill_another:
1578 if (unlikely(ucthread[s->uthread_no].busy))
1579 failure_return(("Trying to start a busy thread, this shouldn't happen!\n"), -1);
1580
1581 if (unlikely(read_seekto(control, sinfo, s->last_head)))
1582 return -1;
1583
1584 if (ENCRYPT) {
1585 if (unlikely(read_buf(control, sinfo->fd, enc_head, SALT_LEN)))
1586 return -1;
1587 sinfo->total_read += SALT_LEN;
1588 }
1589
1590 if (unlikely(read_u8(control, sinfo->fd, &c_type)))
1591 return -1;
1592
1593 /* Compatibility crap for versions < 0.4 */
1594 if (control->major_version == 0 && control->minor_version < 4) {
1595 u32 c_len32, u_len32, last_head32;
1596
1597 if (unlikely(read_u32(control, sinfo->fd, &c_len32)))
1598 return -1;
1599 if (unlikely(read_u32(control, sinfo->fd, &u_len32)))
1600 return -1;
1601 if (unlikely(read_u32(control, sinfo->fd, &last_head32)))
1602 return -1;
1603 c_len = c_len32;
1604 u_len = u_len32;
1605 last_head = last_head32;
1606 header_length = 13;
1607 } else {
1608 int read_len;
1609
1610 print_maxverbose("Reading ucomp header at %lld\n", get_readseek(control, sinfo->fd));
1611 if ((control->major_version == 0 && control->minor_version < 6) || ENCRYPT)
1612 read_len = 8;
1613 else
1614 read_len = sinfo->chunk_bytes;
1615 if (unlikely(read_val(control, sinfo->fd, &c_len, read_len)))
1616 return -1;
1617 if (unlikely(read_val(control, sinfo->fd, &u_len, read_len)))
1618 return -1;
1619 if (unlikely(read_val(control, sinfo->fd, &last_head, read_len)))
1620 return -1;
1621 header_length = 1 + (read_len * 3);
1622 }
1623 sinfo->total_read += header_length;
1624
1625 if (ENCRYPT) {
1626 if (unlikely(!decrypt_header(control, enc_head, &c_type, &c_len, &u_len, &last_head)))
1627 return -1;
1628 if (unlikely(read_buf(control, sinfo->fd, blocksalt, SALT_LEN)))
1629 return -1;
1630 sinfo->total_read += SALT_LEN;
1631 }
1632 c_len = le64toh(c_len);
1633 u_len = le64toh(u_len);
1634 last_head = le64toh(last_head);
1635 print_maxverbose("Fill_buffer stream %d c_len %lld u_len %lld last_head %lld\n", streamno, c_len, u_len, last_head);
1636
1637 padded_len = MAX(c_len, MIN_SIZE);
1638 sinfo->total_read += padded_len;
1639 fsync(control->fd_out);
1640
1641 if (unlikely(u_len > control->maxram))
1642 fatal_return(("Unable to malloc buffer of size %lld in this environment\n", u_len), -1);
1643 s_buf = malloc(MAX(u_len, MIN_SIZE));
1644 if (unlikely(u_len && !s_buf))
1645 fatal_return(("Unable to malloc buffer of size %lld in fill_buffer\n", u_len), -1);
1646 sinfo->ram_alloced += u_len;
1647
1648 if (unlikely(read_buf(control, sinfo->fd, s_buf, padded_len)))
1649 return -1;
1650
1651 if (ENCRYPT) {
1652 if (unlikely(!lrz_decrypt(control, s_buf, padded_len, blocksalt)))
1653 return -1;
1654 }
1655
1656 ucthread[s->uthread_no].s_buf = s_buf;
1657 ucthread[s->uthread_no].c_len = c_len;
1658 ucthread[s->uthread_no].u_len = u_len;
1659 ucthread[s->uthread_no].c_type = c_type;
1660 ucthread[s->uthread_no].streamno = streamno;
1661 s->last_head = last_head;
1662
1663 /* List this thread as busy */
1664 ucthread[s->uthread_no].busy = 1;
1665 print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n",
1666 s->uthread_no, padded_len, streamno);
1667
1668 st = malloc(sizeof(stream_thread_struct));
1669 if (unlikely(!st))
1670 fatal_return(("Unable to malloc in fill_buffer"), -1);
1671 st->i = s->uthread_no;
1672 st->control = control;
1673 if (unlikely(!create_pthread(control, &threads[s->uthread_no], NULL, ucompthread, st))) {
1674 free(st);
1675 return -1;
1676 }
1677
1678 if (++s->uthread_no == s->base_thread + s->total_threads)
1679 s->uthread_no = s->base_thread;
1680
1681 /* Reached the end of this stream, no more data to read in, otherwise
1682 * see if the next thread is free to grab more data. We also check that
1683 * we're not going to be allocating too much ram to generate all these
1684 * threads. */
1685 if (!last_head)
1686 s->eos = 1;
1687 else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy &&
1688 sinfo->ram_alloced < control->maxram)
1689 goto fill_another;
1690 out:
1691 lock_mutex(control, &output_lock);
1692 output_thread = s->unext_thread;
1693 cond_broadcast(control, &output_cond);
1694 unlock_mutex(control, &output_lock);
1695
1696 /* join_pthread here will make it wait till the data is ready */
1697 if (unlikely(!join_pthread(control, threads[s->unext_thread], NULL)))
1698 return -1;
1699 ucthread[s->unext_thread].busy = 0;
1700
1701 print_maxverbose("Taking decompressed data from thread %ld\n", s->unext_thread);
1702 s->buf = ucthread[s->unext_thread].s_buf;
1703 s->buflen = ucthread[s->unext_thread].u_len;
1704 sinfo->ram_alloced -= s->buflen;
1705 s->bufp = 0;
1706
1707 if (++s->unext_thread == s->base_thread + s->total_threads)
1708 s->unext_thread = s->base_thread;
1709
1710 return 0;
1711 }
1712
1713 /* write some data to a stream. Return -1 on failure */
write_stream(rzip_control * control,void * ss,int streamno,uchar * p,i64 len)1714 void write_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len)
1715 {
1716 struct stream_info *sinfo = ss;
1717
1718 while (len) {
1719 i64 n;
1720
1721 n = MIN(sinfo->bufsize - sinfo->s[streamno].buflen, len);
1722
1723 memcpy(sinfo->s[streamno].buf + sinfo->s[streamno].buflen, p, n);
1724 sinfo->s[streamno].buflen += n;
1725 p += n;
1726 len -= n;
1727
1728 /* Flush the buffer every sinfo->bufsize into one thread */
1729 if (sinfo->s[streamno].buflen == sinfo->bufsize)
1730 flush_buffer(control, sinfo, streamno);
1731 }
1732 }
1733
1734 /* read some data from a stream. Return number of bytes read, or -1
1735 on failure */
read_stream(rzip_control * control,void * ss,int streamno,uchar * p,i64 len)1736 i64 read_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len)
1737 {
1738 struct stream_info *sinfo = ss;
1739 i64 ret = 0;
1740
1741 while (len) {
1742 i64 n;
1743
1744 n = MIN(sinfo->s[streamno].buflen - sinfo->s[streamno].bufp, len);
1745
1746 if (n > 0) {
1747 memcpy(p, sinfo->s[streamno].buf + sinfo->s[streamno].bufp, n);
1748 sinfo->s[streamno].bufp += n;
1749 p += n;
1750 len -= n;
1751 ret += n;
1752 }
1753
1754 if (len && sinfo->s[streamno].bufp == sinfo->s[streamno].buflen) {
1755 if (unlikely(fill_buffer(control, sinfo, streamno)))
1756 return -1;
1757 if (sinfo->s[streamno].bufp == sinfo->s[streamno].buflen)
1758 break;
1759 }
1760 }
1761
1762 return ret;
1763 }
1764
1765 /* flush and close down a stream. return -1 on failure */
close_stream_out(rzip_control * control,void * ss)1766 int close_stream_out(rzip_control *control, void *ss)
1767 {
1768 struct stream_info *sinfo = ss;
1769 int i;
1770
1771 for (i = 0; i < sinfo->num_streams; i++)
1772 clear_buffer(control, sinfo, i, 0);
1773
1774 if (ENCRYPT) {
1775 /* Last two compressed blocks do not have an offset written
1776 * to them so we have to go back and encrypt them now, but we
1777 * must wait till the threads return. */
1778 int close_thread = output_thread;
1779
1780 for (i = 0; i < control->threads; i++) {
1781 cksem_wait(control, &cthread[close_thread].cksem);
1782 cksem_post(control, &cthread[close_thread].cksem);
1783 if (++close_thread == control->threads)
1784 close_thread = 0;
1785 }
1786 for (i = 0; i < sinfo->num_streams; i++)
1787 rewrite_encrypted(control, sinfo, sinfo->s[i].last_headofs);
1788 }
1789 if (control->library_mode) {
1790 if (!control->sinfo_buckets) {
1791 /* no streams added */
1792 control->sinfo_queue = calloc(STREAM_BUCKET_SIZE + 1, sizeof(void*));
1793 if (!control->sinfo_queue) {
1794 print_err("Failed to calloc sinfo_queue in close_stream_out\n");
1795 return -1;
1796 }
1797 control->sinfo_buckets++;
1798 } else if (control->sinfo_idx == STREAM_BUCKET_SIZE * control->sinfo_buckets + 1) {
1799 /* all buckets full, create new bucket */
1800 void *tmp;
1801
1802 tmp = realloc(control->sinfo_queue, (++control->sinfo_buckets * STREAM_BUCKET_SIZE + 1) * sizeof(void*));
1803 if (!tmp) {
1804 print_err("Failed to realloc sinfo_queue in close_stream_out\n");
1805 return -1;
1806 }
1807 control->sinfo_queue = tmp;
1808 memset(control->sinfo_queue + control->sinfo_idx, 0, ((control->sinfo_buckets * STREAM_BUCKET_SIZE + 1) - control->sinfo_idx) * sizeof(void*));
1809 }
1810 control->sinfo_queue[control->sinfo_idx++] = sinfo;
1811 }
1812 #if 0
1813 /* These cannot be freed immediately because their values are read after the next
1814 * stream has started. Instead (in library mode), they are stored and only freed
1815 * after the entire operation has completed.
1816 */
1817 free(sinfo->s);
1818 free(sinfo);
1819 #endif
1820 return 0;
1821 }
1822
1823 /* close down an input stream */
close_stream_in(rzip_control * control,void * ss)1824 int close_stream_in(rzip_control *control, void *ss)
1825 {
1826 struct stream_info *sinfo = ss;
1827 int i;
1828
1829 print_maxverbose("Closing stream at %lld, want to seek to %lld\n",
1830 get_readseek(control, control->fd_in),
1831 sinfo->initial_pos + sinfo->total_read);
1832 if (unlikely(read_seekto(control, sinfo, sinfo->total_read)))
1833 return -1;
1834
1835 for (i = 0; i < sinfo->num_streams; i++)
1836 free(sinfo->s[i].buf);
1837
1838 output_thread = 0;
1839 free(ucthread);
1840 free(threads);
1841 free(sinfo->s);
1842 free(sinfo);
1843
1844 return 0;
1845 }
1846
1847 /* As others are slow and lzo very fast, it is worth doing a quick lzo pass
1848 to see if there is any compression at all with lzo first. It is unlikely
1849 that others will be able to compress if lzo is unable to drop a single byte
1850 so do not compress any block that is incompressible by lzo. */
lzo_compresses(rzip_control * control,uchar * s_buf,i64 s_len)1851 static int lzo_compresses(rzip_control *control, uchar *s_buf, i64 s_len)
1852 {
1853 lzo_bytep wrkmem = NULL;
1854 lzo_uint in_len, test_len = s_len, save_len = s_len;
1855 lzo_uint dlen;
1856 uchar *c_buf = NULL, *test_buf = s_buf;
1857 /* set minimum buffer test size based on the length of the test stream */
1858 unsigned long buftest_size = (test_len > 5 * STREAM_BUFSIZE ? STREAM_BUFSIZE : STREAM_BUFSIZE / 4096);
1859 int ret = 0;
1860 int workcounter = 0; /* count # of passes */
1861 lzo_uint best_dlen = UINT_MAX; /* save best compression estimate */
1862
1863 if (!LZO_TEST)
1864 return 1;
1865 wrkmem = (lzo_bytep) malloc(LZO1X_1_MEM_COMPRESS);
1866 if (unlikely(wrkmem == NULL))
1867 fatal_return(("Unable to allocate wrkmem in lzo_compresses\n"), 0);
1868
1869 in_len = MIN(test_len, buftest_size);
1870 dlen = STREAM_BUFSIZE + STREAM_BUFSIZE / 16 + 64 + 3;
1871
1872 c_buf = malloc(dlen);
1873 if (unlikely(!c_buf)) {
1874 free(wrkmem);
1875 fatal_return(("Unable to allocate c_buf in lzo_compresses\n"), 0);
1876 }
1877
1878 /* Test progressively larger blocks at a time and as soon as anything
1879 compressible is found, jump out as a success */
1880 while (test_len > 0) {
1881 workcounter++;
1882 lzo1x_1_compress(test_buf, in_len, (uchar *)c_buf, &dlen, wrkmem);
1883
1884 if (dlen < best_dlen)
1885 best_dlen = dlen; /* save best value */
1886
1887 if (dlen < in_len) {
1888 ret = 1;
1889 break;
1890 }
1891 /* expand and move buffer */
1892 test_len -= in_len;
1893 if (test_len) {
1894 test_buf += (ptrdiff_t)in_len;
1895 if (buftest_size < STREAM_BUFSIZE)
1896 buftest_size <<= 1;
1897 in_len = MIN(test_len, buftest_size);
1898 }
1899 }
1900 print_maxverbose("lzo testing %s for chunk %ld. Compressed size = %5.2F%% of chunk, %d Passes\n",
1901 (ret == 0? "FAILED" : "OK"), save_len,
1902 100 * ((double) best_dlen / (double) in_len), workcounter);
1903
1904 free(wrkmem);
1905 free(c_buf);
1906
1907 return ret;
1908 }
1909