1 /*
2    Copyright (C) 2011 Serge Belyshev
3    Copyright (C) 2006-2016 Con Kolivas
4    Copyright (C) 2011 Peter Hyman
5    Copyright (C) 1998 Andrew Tridgell
6 
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 2 of the License, or
10    (at your option) any later version.
11 
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16 
17    You should have received a copy of the GNU General Public License
18    along with this program. If not, see <http://www.gnu.org/licenses/>.
19 */
20 /* multiplex N streams into a file - the streams are passed
21    through different compressors */
22 
23 #ifdef HAVE_CONFIG_H
24 # include "config.h"
25 #endif
26 
27 #ifdef HAVE_SYS_TIME_H
28 # include <sys/time.h>
29 #endif
30 #ifdef HAVE_SYS_TYPES_H
31 # include <sys/types.h>
32 #endif
33 #ifdef HAVE_SYS_RESOURCE_H
34 # include <sys/resource.h>
35 #endif
36 #ifdef HAVE_UNISTD_H
37 # include <unistd.h>
38 #endif
39 #include <sys/statvfs.h>
40 #include <pthread.h>
41 #include <bzlib.h>
42 #include <zlib.h>
43 #include <lzo/lzoconf.h>
44 #include <lzo/lzo1x.h>
45 #ifdef HAVE_ERRNO_H
46 # include <errno.h>
47 #endif
48 #ifdef HAVE_ENDIAN_H
49 # include <endian.h>
50 #elif HAVE_SYS_ENDIAN_H
51 # include <sys/endian.h>
52 #endif
53 #ifdef HAVE_ARPA_INET_H
54 # include <arpa/inet.h>
55 #endif
56 
57 /* LZMA C Wrapper */
58 #include "lzma/C/LzmaLib.h"
59 
60 #include "util.h"
61 #include "lrzip_core.h"
62 
63 #define STREAM_BUFSIZE (1024 * 1024 * 10)
64 
65 static struct compress_thread{
66 	uchar *s_buf;	/* Uncompressed buffer -> Compressed buffer */
67 	uchar c_type;	/* Compression type */
68 	i64 s_len;	/* Data length uncompressed */
69 	i64 c_len;	/* Data length compressed */
70 	cksem_t cksem;  /* This thread's semaphore */
71 	struct stream_info *sinfo;
72 	int streamno;
73 	uchar salt[SALT_LEN];
74 } *cthread;
75 
76 static struct uncomp_thread{
77 	uchar *s_buf;
78 	i64 u_len, c_len;
79 	i64 last_head;
80 	uchar c_type;
81 	int busy;
82 	int streamno;
83 } *ucthread;
84 
85 typedef struct stream_thread_struct {
86 	int i;
87 	rzip_control *control;
88 } stream_thread_struct;
89 
90 static long output_thread;
91 static pthread_mutex_t output_lock = PTHREAD_MUTEX_INITIALIZER;
92 static pthread_cond_t output_cond = PTHREAD_COND_INITIALIZER;
93 static pthread_t *threads;
94 
init_mutex(rzip_control * control,pthread_mutex_t * mutex)95 bool init_mutex(rzip_control *control, pthread_mutex_t *mutex)
96 {
97 	if (unlikely(pthread_mutex_init(mutex, NULL)))
98 		fatal_return(("pthread_mutex_init failed"), false);
99 	return true;
100 }
101 
unlock_mutex(rzip_control * control,pthread_mutex_t * mutex)102 bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex)
103 {
104 	if (unlikely(pthread_mutex_unlock(mutex)))
105 		fatal_return(("pthread_mutex_unlock failed"), false);
106 	return true;
107 }
108 
lock_mutex(rzip_control * control,pthread_mutex_t * mutex)109 bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex)
110 {
111 	if (unlikely(pthread_mutex_lock(mutex)))
112 		fatal_return(("pthread_mutex_lock failed"), false);
113 	return true;
114 }
115 
cond_wait(rzip_control * control,pthread_cond_t * cond,pthread_mutex_t * mutex)116 static bool cond_wait(rzip_control *control, pthread_cond_t *cond, pthread_mutex_t *mutex)
117 {
118 	if (unlikely(pthread_cond_wait(cond, mutex)))
119 		fatal_return(("pthread_cond_wait failed"), false);
120 	return true;
121 }
122 
cond_broadcast(rzip_control * control,pthread_cond_t * cond)123 static bool cond_broadcast(rzip_control *control, pthread_cond_t *cond)
124 {
125 	if (unlikely(pthread_cond_broadcast(cond)))
126 		fatal_return(("pthread_cond_broadcast failed"), false);
127 	return true;
128 }
129 
create_pthread(rzip_control * control,pthread_t * thread,pthread_attr_t * attr,void * (* start_routine)(void *),void * arg)130 bool create_pthread(rzip_control *control, pthread_t *thread, pthread_attr_t * attr,
131 	void * (*start_routine)(void *), void *arg)
132 {
133 	if (unlikely(pthread_create(thread, attr, start_routine, arg)))
134 		fatal_return(("pthread_create"), false);
135 	return true;
136 }
137 
detach_pthread(rzip_control * control,pthread_t * thread)138 bool detach_pthread(rzip_control *control, pthread_t *thread)
139 {
140 	if (unlikely(pthread_detach(*thread)))
141 		fatal_return(("pthread_detach"), false);
142 	return true;
143 }
144 
join_pthread(rzip_control * control,pthread_t th,void ** thread_return)145 bool join_pthread(rzip_control *control, pthread_t th, void **thread_return)
146 {
147 	if (pthread_join(th, thread_return))
148 		fatal_return(("pthread_join"), false);
149 	return true;
150 }
151 
152 /* just to keep things clean, declare function here
153  * but move body to the end since it's a work function
154 */
155 static int lzo_compresses(rzip_control *control, uchar *s_buf, i64 s_len);
156 
157 /*
158   ***** COMPRESSION FUNCTIONS *****
159 
160   ZPAQ, BZIP, GZIP, LZMA, LZO
161 
162   try to compress a buffer. If compression fails for whatever reason then
163   leave uncompressed. Return the compression type in c_type and resulting
164   length in c_len
165 */
166 
zpaq_compress_buf(rzip_control * control,struct compress_thread * cthread,long thread)167 static int zpaq_compress_buf(rzip_control *control, struct compress_thread *cthread, long thread)
168 {
169 	i64 c_len, c_size;
170 	uchar *c_buf;
171 
172 	if (!lzo_compresses(control, cthread->s_buf, cthread->s_len))
173 		return 0;
174 
175 	c_size = round_up_page(control, cthread->s_len + 10000);
176 	c_buf = malloc(c_size);
177 	if (!c_buf) {
178 		print_err("Unable to allocate c_buf in zpaq_compress_buf\n");
179 		return -1;
180 	}
181 
182 	c_len = 0;
183 
184 	zpaq_compress(c_buf, &c_len, cthread->s_buf, cthread->s_len, control->compression_level / 4 + 1,
185 		      control->msgout, SHOW_PROGRESS ? true: false, thread);
186 
187 	if (unlikely(c_len >= cthread->c_len)) {
188 		print_maxverbose("Incompressible block\n");
189 		/* Incompressible, leave as CTYPE_NONE */
190 		free(c_buf);
191 		return 0;
192 	}
193 
194 	cthread->c_len = c_len;
195 	free(cthread->s_buf);
196 	cthread->s_buf = c_buf;
197 	cthread->c_type = CTYPE_ZPAQ;
198 	return 0;
199 }
200 
bzip2_compress_buf(rzip_control * control,struct compress_thread * cthread)201 static int bzip2_compress_buf(rzip_control *control, struct compress_thread *cthread)
202 {
203 	u32 dlen = round_up_page(control, cthread->s_len);
204 	int bzip2_ret;
205 	uchar *c_buf;
206 
207 	if (!lzo_compresses(control, cthread->s_buf, cthread->s_len))
208 		return 0;
209 
210 	c_buf = malloc(dlen);
211 	if (!c_buf) {
212 		print_err("Unable to allocate c_buf in bzip2_compress_buf\n");
213 		return -1;
214 	}
215 
216 	bzip2_ret = BZ2_bzBuffToBuffCompress((char *)c_buf, &dlen,
217 		(char *)cthread->s_buf, cthread->s_len,
218 		control->compression_level, 0, control->compression_level * 10);
219 
220 	/* if compressed data is bigger then original data leave as
221 	 * CTYPE_NONE */
222 
223 	if (bzip2_ret == BZ_OUTBUFF_FULL) {
224 		print_maxverbose("Incompressible block\n");
225 		/* Incompressible, leave as CTYPE_NONE */
226 		free(c_buf);
227 		return 0;
228 	}
229 
230 	if (unlikely(bzip2_ret != BZ_OK)) {
231 		free(c_buf);
232 		print_maxverbose("BZ2 compress failed\n");
233 		return -1;
234 	}
235 
236 	if (unlikely(dlen >= cthread->c_len)) {
237 		print_maxverbose("Incompressible block\n");
238 		/* Incompressible, leave as CTYPE_NONE */
239 		free(c_buf);
240 		return 0;
241 	}
242 
243 	cthread->c_len = dlen;
244 	free(cthread->s_buf);
245 	cthread->s_buf = c_buf;
246 	cthread->c_type = CTYPE_BZIP2;
247 	return 0;
248 }
249 
gzip_compress_buf(rzip_control * control,struct compress_thread * cthread)250 static int gzip_compress_buf(rzip_control *control, struct compress_thread *cthread)
251 {
252 	unsigned long dlen = round_up_page(control, cthread->s_len);
253 	uchar *c_buf;
254 	int gzip_ret;
255 
256 	c_buf = malloc(dlen);
257 	if (!c_buf) {
258 		print_err("Unable to allocate c_buf in gzip_compress_buf\n");
259 		return -1;
260 	}
261 
262 	gzip_ret = compress2(c_buf, &dlen, cthread->s_buf, cthread->s_len,
263 		control->compression_level);
264 
265 	/* if compressed data is bigger then original data leave as
266 	 * CTYPE_NONE */
267 
268 	if (gzip_ret == Z_BUF_ERROR) {
269 		print_maxverbose("Incompressible block\n");
270 		/* Incompressible, leave as CTYPE_NONE */
271 		free(c_buf);
272 		return 0;
273 	}
274 
275 	if (unlikely(gzip_ret != Z_OK)) {
276 		free(c_buf);
277 		print_maxverbose("compress2 failed\n");
278 		return -1;
279 	}
280 
281 	if (unlikely((i64)dlen >= cthread->c_len)) {
282 		print_maxverbose("Incompressible block\n");
283 		/* Incompressible, leave as CTYPE_NONE */
284 		free(c_buf);
285 		return 0;
286 	}
287 
288 	cthread->c_len = dlen;
289 	free(cthread->s_buf);
290 	cthread->s_buf = c_buf;
291 	cthread->c_type = CTYPE_GZIP;
292 	return 0;
293 }
294 
lzma_compress_buf(rzip_control * control,struct compress_thread * cthread)295 static int lzma_compress_buf(rzip_control *control, struct compress_thread *cthread)
296 {
297 	unsigned char lzma_properties[5]; /* lzma properties, encoded */
298 	int lzma_level, lzma_ret;
299 	size_t prop_size = 5; /* return value for lzma_properties */
300 	uchar *c_buf;
301 	size_t dlen;
302 
303 	if (!lzo_compresses(control, cthread->s_buf, cthread->s_len))
304 		return 0;
305 
306 	/* only 7 levels with lzma, scale them */
307 	lzma_level = control->compression_level * 7 / 9;
308 	if (!lzma_level)
309 		lzma_level = 1;
310 
311 	print_maxverbose("Starting lzma back end compression thread...\n");
312 retry:
313 	dlen = round_up_page(control, cthread->s_len);
314 	c_buf = malloc(dlen);
315 	if (!c_buf) {
316 		print_err("Unable to allocate c_buf in lzma_compress_buf\n");
317 		return -1;
318 	}
319 
320 	/* with LZMA SDK 4.63, we pass compression level and threads only
321 	 * and receive properties in lzma_properties */
322 
323 	lzma_ret = LzmaCompress(c_buf, &dlen, cthread->s_buf,
324 		(size_t)cthread->s_len, lzma_properties, &prop_size,
325 				lzma_level,
326 				0, /* dict size. set default, choose by level */
327 				-1, -1, -1, -1, /* lc, lp, pb, fb */
328 				control->threads);
329 	if (lzma_ret != SZ_OK) {
330 		switch (lzma_ret) {
331 			case SZ_ERROR_MEM:
332 				break;
333 			case SZ_ERROR_PARAM:
334 				print_err("LZMA Parameter ERROR: %d. This should not happen.\n", SZ_ERROR_PARAM);
335 				break;
336 			case SZ_ERROR_OUTPUT_EOF:
337 				print_maxverbose("Harmless LZMA Output Buffer Overflow error: %d. Incompressible block.\n", SZ_ERROR_OUTPUT_EOF);
338 				break;
339 			case SZ_ERROR_THREAD:
340 				print_err("LZMA Multi Thread ERROR: %d. This should not happen.\n", SZ_ERROR_THREAD);
341 				break;
342 			default:
343 				print_err("Unidentified LZMA ERROR: %d. This should not happen.\n", lzma_ret);
344 				break;
345 		}
346 		/* can pass -1 if not compressible! Thanks Lasse Collin */
347 		free(c_buf);
348 		if (lzma_ret == SZ_ERROR_MEM) {
349 			if (lzma_level > 1) {
350 				lzma_level--;
351 				print_verbose("LZMA Warning: %d. Can't allocate enough RAM for compression window, trying smaller.\n", SZ_ERROR_MEM);
352 				goto retry;
353 			}
354 			/* lzma compress can be fragile on 32 bit. If it fails,
355 			 * fall back to bzip2 compression so the block doesn't
356 			 * remain uncompressed */
357 			print_verbose("Unable to allocate enough RAM for any sized compression window, falling back to bzip2 compression.\n");
358 			return bzip2_compress_buf(control, cthread);
359 		} else if (lzma_ret == SZ_ERROR_OUTPUT_EOF)
360 			return 0;
361 		return -1;
362 	}
363 
364 	if (unlikely((i64)dlen >= cthread->c_len)) {
365 		/* Incompressible, leave as CTYPE_NONE */
366 		print_maxverbose("Incompressible block\n");
367 		free(c_buf);
368 		return 0;
369 	}
370 
371 	/* Make sure multiple threads don't race on writing lzma_properties */
372 	lock_mutex(control, &control->control_lock);
373 	if (!control->lzma_prop_set) {
374 		memcpy(control->lzma_properties, lzma_properties, 5);
375 		control->lzma_prop_set = true;
376 		/* Reset the magic written flag so we write it again if we
377 		 * get lzma properties and haven't written them yet. */
378 		if (TMP_OUTBUF)
379 			control->magic_written = 0;
380 	}
381 	unlock_mutex(control, &control->control_lock);
382 
383 	cthread->c_len = dlen;
384 	free(cthread->s_buf);
385 	cthread->s_buf = c_buf;
386 	cthread->c_type = CTYPE_LZMA;
387 	return 0;
388 }
389 
lzo_compress_buf(rzip_control * control,struct compress_thread * cthread)390 static int lzo_compress_buf(rzip_control *control, struct compress_thread *cthread)
391 {
392 	lzo_uint in_len = cthread->s_len;
393 	lzo_uint dlen = round_up_page(control, in_len + in_len / 16 + 64 + 3);
394 	lzo_bytep wrkmem;
395 	uchar *c_buf;
396 	int ret = -1;
397 
398 	wrkmem = (lzo_bytep) calloc(1, LZO1X_1_MEM_COMPRESS);
399 	if (unlikely(wrkmem == NULL)) {
400 		print_maxverbose("Failed to malloc wkmem\n");
401 		return ret;
402 	}
403 
404 	c_buf = malloc(dlen);
405 	if (!c_buf) {
406 		print_err("Unable to allocate c_buf in lzo_compress_buf");
407 		goto out_free;
408 	}
409 
410 	/* lzo1x_1_compress does not return anything but LZO_OK so we ignore
411 	 * the return value */
412 	lzo1x_1_compress(cthread->s_buf, in_len, c_buf, &dlen, wrkmem);
413 	ret = 0;
414 
415 	if (dlen >= in_len){
416 		/* Incompressible, leave as CTYPE_NONE */
417 		print_maxverbose("Incompressible block\n");
418 		free(c_buf);
419 		goto out_free;
420 	}
421 
422 	cthread->c_len = dlen;
423 	free(cthread->s_buf);
424 	cthread->s_buf = c_buf;
425 	cthread->c_type = CTYPE_LZO;
426 out_free:
427 	free(wrkmem);
428 	return ret;
429 }
430 
431 /*
432   ***** DECOMPRESSION FUNCTIONS *****
433 
434   ZPAQ, BZIP, GZIP, LZMA, LZO
435 
436   try to decompress a buffer. Return 0 on success and -1 on failure.
437 */
zpaq_decompress_buf(rzip_control * control __UNUSED__,struct uncomp_thread * ucthread,long thread)438 static int zpaq_decompress_buf(rzip_control *control __UNUSED__, struct uncomp_thread *ucthread, long thread)
439 {
440 	i64 dlen = ucthread->u_len;
441 	uchar *c_buf;
442 	int ret = 0;
443 
444 	c_buf = ucthread->s_buf;
445 	ucthread->s_buf = malloc(round_up_page(control, dlen));
446 	if (unlikely(!ucthread->s_buf)) {
447 		print_err("Failed to allocate %ld bytes for decompression\n", dlen);
448 		ret = -1;
449 		goto out;
450 	}
451 
452 	dlen = 0;
453 	zpaq_decompress(ucthread->s_buf, &dlen, c_buf, ucthread->c_len,
454 			control->msgout, SHOW_PROGRESS ? true: false, thread);
455 
456 	if (unlikely(dlen != ucthread->u_len)) {
457 		print_err("Inconsistent length after decompression. Got %ld bytes, expected %lld\n", dlen, ucthread->u_len);
458 		ret = -1;
459 	}
460 
461 	free(c_buf);
462 out:
463 	if (ret == -1)
464 		ucthread->s_buf = c_buf;
465 	return ret;
466 }
467 
bzip2_decompress_buf(rzip_control * control __UNUSED__,struct uncomp_thread * ucthread)468 static int bzip2_decompress_buf(rzip_control *control __UNUSED__, struct uncomp_thread *ucthread)
469 {
470 	u32 dlen = ucthread->u_len;
471 	int ret = 0, bzerr;
472 	uchar *c_buf;
473 
474 	c_buf = ucthread->s_buf;
475 	ucthread->s_buf = malloc(round_up_page(control, dlen));
476 	if (unlikely(!ucthread->s_buf)) {
477 		print_err("Failed to allocate %d bytes for decompression\n", dlen);
478 		ret = -1;
479 		goto out;
480 	}
481 
482 	bzerr = BZ2_bzBuffToBuffDecompress((char*)ucthread->s_buf, &dlen, (char*)c_buf, ucthread->c_len, 0, 0);
483 	if (unlikely(bzerr != BZ_OK)) {
484 		print_err("Failed to decompress buffer - bzerr=%d\n", bzerr);
485 		free(ucthread->s_buf);
486 		ucthread->s_buf = c_buf;
487 		ret = -1;
488 		goto out;
489 	}
490 
491 	if (unlikely(dlen != ucthread->u_len)) {
492 		print_err("Inconsistent length after decompression. Got %d bytes, expected %lld\n", dlen, ucthread->u_len);
493 		ret = -1;
494 	}
495 
496 	free(c_buf);
497 out:
498 	if (ret == -1)
499 		ucthread->s_buf = c_buf;
500 	return ret;
501 }
502 
gzip_decompress_buf(rzip_control * control __UNUSED__,struct uncomp_thread * ucthread)503 static int gzip_decompress_buf(rzip_control *control __UNUSED__, struct uncomp_thread *ucthread)
504 {
505 	unsigned long dlen = ucthread->u_len;
506 	int ret = 0, gzerr;
507 	uchar *c_buf;
508 
509 	c_buf = ucthread->s_buf;
510 	ucthread->s_buf = malloc(round_up_page(control, dlen));
511 	if (unlikely(!ucthread->s_buf)) {
512 		print_err("Failed to allocate %ld bytes for decompression\n", dlen);
513 		ret = -1;
514 		goto out;
515 	}
516 
517 	gzerr = uncompress(ucthread->s_buf, &dlen, c_buf, ucthread->c_len);
518 	if (unlikely(gzerr != Z_OK)) {
519 		print_err("Failed to decompress buffer - gzerr=%d\n", gzerr);
520 		free(ucthread->s_buf);
521 		ucthread->s_buf = c_buf;
522 		ret = -1;
523 		goto out;
524 	}
525 
526 	if (unlikely((i64)dlen != ucthread->u_len)) {
527 		print_err("Inconsistent length after decompression. Got %ld bytes, expected %lld\n", dlen, ucthread->u_len);
528 		ret = -1;
529 	}
530 
531 	free(c_buf);
532 out:
533 	if (ret == -1)
534 		ucthread->s_buf = c_buf;
535 	return ret;
536 }
537 
lzma_decompress_buf(rzip_control * control,struct uncomp_thread * ucthread)538 static int lzma_decompress_buf(rzip_control *control, struct uncomp_thread *ucthread)
539 {
540 	size_t dlen = ucthread->u_len;
541 	int ret = 0, lzmaerr;
542 	uchar *c_buf;
543 	SizeT c_len = ucthread->c_len;
544 
545 	c_buf = ucthread->s_buf;
546 	ucthread->s_buf = malloc(round_up_page(control, dlen));
547 	if (unlikely(!ucthread->s_buf)) {
548 		print_err("Failed to allocate %lld bytes for decompression\n", (i64)dlen);
549 		ret = -1;
550 		goto out;
551 	}
552 
553 	/* With LZMA SDK 4.63 we pass control->lzma_properties
554 	 * which is needed for proper uncompress */
555 	lzmaerr = LzmaUncompress(ucthread->s_buf, &dlen, c_buf, &c_len, control->lzma_properties, 5);
556 	if (unlikely(lzmaerr)) {
557 		print_err("Failed to decompress buffer - lzmaerr=%d\n", lzmaerr);
558 		free(ucthread->s_buf);
559 		ucthread->s_buf = c_buf;
560 		ret = -1;
561 		goto out;
562 	}
563 
564 	if (unlikely((i64)dlen != ucthread->u_len)) {
565 		print_err("Inconsistent length after decompression. Got %lld bytes, expected %lld\n", (i64)dlen, ucthread->u_len);
566 		ret = -1;
567 	}
568 
569 	free(c_buf);
570 out:
571 	if (ret == -1)
572 		ucthread->s_buf = c_buf;
573 	return ret;
574 }
575 
lzo_decompress_buf(rzip_control * control __UNUSED__,struct uncomp_thread * ucthread)576 static int lzo_decompress_buf(rzip_control *control __UNUSED__, struct uncomp_thread *ucthread)
577 {
578 	lzo_uint dlen = ucthread->u_len;
579 	int ret = 0, lzerr;
580 	uchar *c_buf;
581 
582 	c_buf = ucthread->s_buf;
583 	ucthread->s_buf = malloc(round_up_page(control, dlen));
584 	if (unlikely(!ucthread->s_buf)) {
585 		print_err("Failed to allocate %lu bytes for decompression\n", (unsigned long)dlen);
586 		ret = -1;
587 		goto out;
588 	}
589 
590 	lzerr = lzo1x_decompress((uchar*)c_buf, ucthread->c_len, (uchar*)ucthread->s_buf, &dlen, NULL);
591 	if (unlikely(lzerr != LZO_E_OK)) {
592 		print_err("Failed to decompress buffer - lzerr=%d\n", lzerr);
593 		free(ucthread->s_buf);
594 		ucthread->s_buf = c_buf;
595 		ret = -1;
596 		goto out;
597 	}
598 
599 	if (unlikely((i64)dlen != ucthread->u_len)) {
600 		print_err("Inconsistent length after decompression. Got %lu bytes, expected %lld\n", (unsigned long)dlen, ucthread->u_len);
601 		ret = -1;
602 	}
603 
604 	free(c_buf);
605 out:
606 	if (ret == -1)
607 		ucthread->s_buf = c_buf;
608 	return ret;
609 }
610 
611 /* WORK FUNCTIONS */
612 
613 /* Look at whether we're writing to a ram location or physical files and write
614  * the data accordingly. */
put_fdout(rzip_control * control,void * offset_buf,ssize_t ret)615 ssize_t put_fdout(rzip_control *control, void *offset_buf, ssize_t ret)
616 {
617 	if (!TMP_OUTBUF)
618 		return write(control->fd_out, offset_buf, (size_t)ret);
619 
620 	if (unlikely(control->out_ofs + ret > control->out_maxlen)) {
621 		/* The data won't fit in a temporary output buffer so we have
622 		 * to fall back to temporary files. */
623 		print_verbose("Unable to decompress entirely in ram, will use physical files\n");
624 		if (unlikely(control->fd_out == -1))
625 			failure("Was unable to decompress entirely in ram and no temporary file creation was possible\n");
626 		if (unlikely(!write_fdout(control, control->tmp_outbuf, control->out_len))) {
627 			print_err("Unable to write_fdout tmpoutbuf in put_fdout\n");
628 			return -1;
629 		}
630 		close_tmpoutbuf(control);
631 		if (unlikely(!write_fdout(control, offset_buf, ret))) {
632 			print_err("Unable to write_fdout offset_buf in put_fdout\n");
633 			return -1;
634 		}
635 		return ret;
636 	}
637 	memcpy(control->tmp_outbuf + control->out_ofs, offset_buf, ret);
638 	control->out_ofs += ret;
639 	if (likely(control->out_ofs > control->out_len))
640 		control->out_len = control->out_ofs;
641 	return ret;
642 }
643 
644 /* This is a custom version of write() which writes in 1GB chunks to avoid
645    the overflows at the >= 2GB mark thanks to 32bit fuckage. This should help
646    even on the rare occasion write() fails to write 1GB as well. */
write_1g(rzip_control * control,void * buf,i64 len)647 ssize_t write_1g(rzip_control *control, void *buf, i64 len)
648 {
649 	uchar *offset_buf = buf;
650 	ssize_t ret;
651 	i64 total;
652 
653 	total = 0;
654 	while (len > 0) {
655 		ret = MIN(len, one_g);
656 		ret = put_fdout(control, offset_buf, (size_t)ret);
657 		if (unlikely(ret <= 0))
658 			return ret;
659 		len -= ret;
660 		offset_buf += ret;
661 		total += ret;
662 	}
663 	return total;
664 }
665 
666 /* Should be called only if we know the buffer will be large enough, otherwise
667  * we must dump_stdin first */
read_fdin(struct rzip_control * control,i64 len)668 static bool read_fdin(struct rzip_control *control, i64 len)
669 {
670 	int tmpchar;
671 	i64 i;
672 
673 	for (i = 0; i < len; i++) {
674 		tmpchar = getchar();
675 		if (unlikely(tmpchar == EOF))
676 			failure_return(("Reached end of file on STDIN prematurely on read_fdin, asked for %lld got %lld\n",
677 				len, i), false);
678 		control->tmp_inbuf[control->in_ofs + i] = (char)tmpchar;
679 	}
680 	control->in_len = control->in_ofs + len;
681 	return true;
682 }
683 
684 /* Dump STDIN into a temporary file */
dump_stdin(rzip_control * control)685 static int dump_stdin(rzip_control *control)
686 {
687 	if (unlikely(!write_fdin(control)))
688 		return -1;
689 	if (unlikely(!read_tmpinfile(control, control->fd_in)))
690 		return -1;
691 	close_tmpinbuf(control);
692 	return 0;
693 }
694 
695 /* Ditto for read */
read_1g(rzip_control * control,int fd,void * buf,i64 len)696 ssize_t read_1g(rzip_control *control, int fd, void *buf, i64 len)
697 {
698 	uchar *offset_buf = buf;
699 	ssize_t ret;
700 	i64 total;
701 
702 	if (TMP_INBUF && fd == control->fd_in) {
703 		/* We're decompressing from STDIN */
704 		if (unlikely(control->in_ofs + len > control->in_maxlen)) {
705 			/* We're unable to fit it all into the temp buffer */
706 			if (dump_stdin(control))
707 				failure_return(("Inadequate ram to %compress from STDIN and unable to create in tmpfile"), -1);
708 			goto read_fd;
709 		}
710 		if (control->in_ofs + len > control->in_len) {
711 			if (unlikely(!read_fdin(control, control->in_ofs + len - control->in_len)))
712 				return false;
713 		}
714 		memcpy(buf, control->tmp_inbuf + control->in_ofs, len);
715 		control->in_ofs += len;
716 		return len;
717 	}
718 
719 	if (TMP_OUTBUF && fd == control->fd_out) {
720 		if (unlikely(control->out_ofs + len > control->out_maxlen))
721 			failure_return(("Trying to read beyond out_ofs in tmpoutbuf\n"), -1);
722 		memcpy(buf, control->tmp_outbuf + control->out_ofs, len);
723 		control->out_ofs += len;
724 		return len;
725 	}
726 
727 read_fd:
728 	total = 0;
729 	while (len > 0) {
730 		ret = MIN(len, one_g);
731 		ret = read(fd, offset_buf, (size_t)ret);
732 		if (unlikely(ret <= 0))
733 			return ret;
734 		len -= ret;
735 		offset_buf += ret;
736 		total += ret;
737 	}
738 	return total;
739 }
740 
741 /* write to a file, return 0 on success and -1 on failure */
write_buf(rzip_control * control,uchar * p,i64 len)742 static int write_buf(rzip_control *control, uchar *p, i64 len)
743 {
744 	ssize_t ret;
745 
746 	ret = write_1g(control, p, (size_t)len);
747 	if (unlikely(ret == -1)) {
748 		print_err("Write of length %lld failed - %s\n", len, strerror(errno));
749 		return -1;
750 	}
751 	if (unlikely(ret != (ssize_t)len)) {
752 		print_err("Partial write!? asked for %lld bytes but got %lld\n", len, (i64)ret);
753 		return -1;
754 	}
755 	return 0;
756 }
757 
758 /* write a byte */
write_u8(rzip_control * control,uchar v)759 static inline int write_u8(rzip_control *control, uchar v)
760 {
761 	return write_buf(control, &v, 1);
762 }
763 
write_val(rzip_control * control,i64 v,int len)764 static inline int write_val(rzip_control *control, i64 v, int len)
765 {
766 	v = htole64(v);
767 	return write_buf(control, (uchar *)&v, len);
768 }
769 
read_buf(rzip_control * control,int f,uchar * p,i64 len)770 static int read_buf(rzip_control *control, int f, uchar *p, i64 len)
771 {
772 	ssize_t ret;
773 
774 	ret = read_1g(control, f, p, (size_t)len);
775 	if (unlikely(ret == -1)) {
776 		print_err("Read of length %lld failed - %s\n", len, strerror(errno));
777 		return -1;
778 	}
779 	if (unlikely(ret != (ssize_t)len)) {
780 		print_err("Partial read!? asked for %lld bytes but got %lld\n", len, (i64)ret);
781 		return -1;
782 	}
783 	return 0;
784 }
785 
read_u8(rzip_control * control,int f,uchar * v)786 static inline int read_u8(rzip_control *control, int f, uchar *v)
787 {
788 	return read_buf(control, f, v, 1);
789 }
790 
read_u32(rzip_control * control,int f,u32 * v)791 static inline int read_u32(rzip_control *control, int f, u32 *v)
792 {
793 	int ret = read_buf(control, f, (uchar *)v, 4);
794 
795 	*v = le32toh(*v);
796 	return ret;
797 }
798 
read_val(rzip_control * control,int f,i64 * v,int len)799 static inline int read_val(rzip_control *control, int f, i64 *v, int len)
800 {
801 	int ret;
802 
803 	/* We only partially read all 8 bytes so have to zero v here */
804 	*v = 0;
805 	ret = read_buf(control, f, (uchar *)v, len);
806 	return ret;
807 }
808 
fd_seekto(rzip_control * control,struct stream_info * sinfo,i64 spos,i64 pos)809 static int fd_seekto(rzip_control *control, struct stream_info *sinfo, i64 spos, i64 pos)
810 {
811 	if (unlikely(lseek(sinfo->fd, spos, SEEK_SET) != spos)) {
812 		print_err("Failed to seek to %lld in stream\n", pos);
813 		return -1;
814 	}
815 	return 0;
816 }
817 
818 /* seek to a position within a set of streams - return -1 on failure */
seekto(rzip_control * control,struct stream_info * sinfo,i64 pos)819 static int seekto(rzip_control *control, struct stream_info *sinfo, i64 pos)
820 {
821 	i64 spos = pos + sinfo->initial_pos;
822 
823 	if (TMP_OUTBUF) {
824 		spos -= control->out_relofs;
825 		control->out_ofs = spos;
826 		if (unlikely(spos > control->out_len || spos < 0)) {
827 			print_err("Trying to seek to %lld outside tmp outbuf in seekto\n", spos);
828 			return -1;
829 		}
830 		return 0;
831 	}
832 
833 	return fd_seekto(control, sinfo, spos, pos);
834 }
835 
read_seekto(rzip_control * control,struct stream_info * sinfo,i64 pos)836 static int read_seekto(rzip_control *control, struct stream_info *sinfo, i64 pos)
837 {
838 	i64 spos = pos + sinfo->initial_pos;
839 
840 	if (TMP_INBUF) {
841 		if (spos > control->in_len) {
842 			i64 len = spos - control->in_len;
843 
844 			if (control->in_ofs + len > control->in_maxlen) {
845 				if (unlikely(dump_stdin(control)))
846 					return -1;
847 				goto fd_seek;
848 			} else {
849 				if (unlikely(!read_fdin(control, len)))
850 					return -1;
851 			}
852 		}
853 		control->in_ofs = spos;
854 		if (unlikely(spos < 0)) {
855 			print_err("Trying to seek to %lld outside tmp inbuf in read_seekto\n", spos);
856 			return -1;
857 		}
858 		return 0;
859 	}
860 fd_seek:
861 	return fd_seekto(control, sinfo, spos, pos);
862 }
863 
get_seek(rzip_control * control,int fd)864 static i64 get_seek(rzip_control *control, int fd)
865 {
866 	i64 ret;
867 
868 	if (TMP_OUTBUF)
869 		return control->out_relofs + control->out_ofs;
870 	ret = lseek(fd, 0, SEEK_CUR);
871 	if (unlikely(ret == -1))
872 		fatal_return(("Failed to lseek in get_seek\n"), -1);
873 	return ret;
874 }
875 
get_readseek(rzip_control * control,int fd)876 i64 get_readseek(rzip_control *control, int fd)
877 {
878 	i64 ret;
879 
880 	if (TMP_INBUF)
881 		return control->in_ofs;
882 	ret = lseek(fd, 0, SEEK_CUR);
883 	if (unlikely(ret == -1))
884 		fatal_return(("Failed to lseek in get_seek\n"), -1);
885 	return ret;
886 }
887 
prepare_streamout_threads(rzip_control * control)888 bool prepare_streamout_threads(rzip_control *control)
889 {
890 	int i;
891 
892 	/* As we serialise the generation of threads during the rzip
893 	 * pre-processing stage, it's faster to have one more thread available
894 	 * to keep all CPUs busy. There is no point splitting up the chunks
895 	 * into multiple threads if there will be no compression back end. */
896 	if (control->threads > 1)
897 		++control->threads;
898 	if (NO_COMPRESS)
899 		control->threads = 1;
900 	threads = calloc(sizeof(pthread_t), control->threads);
901 	if (unlikely(!threads))
902 		fatal_return(("Unable to calloc threads in prepare_streamout_threads\n"), false);
903 
904 	cthread = calloc(sizeof(struct compress_thread), control->threads);
905 	if (unlikely(!cthread)) {
906 		free(threads);
907 		fatal_return(("Unable to calloc cthread in prepare_streamout_threads\n"), false);
908 	}
909 
910 	for (i = 0; i < control->threads; i++) {
911 		cksem_init(control, &cthread[i].cksem);
912 		cksem_post(control, &cthread[i].cksem);
913 	}
914 	return true;
915 }
916 
917 
close_streamout_threads(rzip_control * control)918 bool close_streamout_threads(rzip_control *control)
919 {
920 	int i, close_thread = output_thread;
921 
922 	/* Wait for the threads in the correct order in case they end up
923 	 * serialised */
924 	for (i = 0; i < control->threads; i++) {
925 		cksem_wait(control, &cthread[close_thread].cksem);
926 
927 		if (++close_thread == control->threads)
928 			close_thread = 0;
929 	}
930 	free(cthread);
931 	free(threads);
932 	return true;
933 }
934 
935 /* open a set of output streams, compressing with the given
936    compression level and algorithm */
open_stream_out(rzip_control * control,int f,unsigned int n,i64 chunk_limit,char cbytes)937 void *open_stream_out(rzip_control *control, int f, unsigned int n, i64 chunk_limit, char cbytes)
938 {
939 	struct stream_info *sinfo;
940 	i64 testsize, limit;
941 	uchar *testmalloc;
942 	unsigned int i, testbufs;
943 
944 	sinfo = calloc(sizeof(struct stream_info), 1);
945 	if (unlikely(!sinfo))
946 		return NULL;
947 	if (chunk_limit < control->page_size)
948 		chunk_limit = control->page_size;
949 	sinfo->bufsize = sinfo->size = limit = chunk_limit;
950 
951 	sinfo->chunk_bytes = cbytes;
952 	sinfo->num_streams = n;
953 	sinfo->fd = f;
954 
955 	sinfo->s = calloc(sizeof(struct stream), n);
956 	if (unlikely(!sinfo->s)) {
957 		free(sinfo);
958 		return NULL;
959 	}
960 
961 	/* Find the largest we can make the window based on ability to malloc
962 	 * ram. We need 2 buffers for each compression thread and the overhead
963 	 * of each compression back end. No 2nd buf is required when there is
964 	 * no back end compression. We limit the total regardless to 1/3 ram
965 	 * for when the OS lies due to heavy overcommit. */
966 	if (NO_COMPRESS)
967 		testbufs = 1;
968 	else
969 		testbufs = 2;
970 
971 	testsize = (limit * testbufs) + (control->overhead * control->threads);
972 	if (testsize > control->usable_ram)
973 		limit = (control->usable_ram - (control->overhead * control->threads)) / testbufs;
974 
975 	/* If we don't have enough ram for the number of threads, decrease the
976 	 * number of threads till we do, or only have one thread. */
977 	while (limit < STREAM_BUFSIZE && limit < chunk_limit) {
978 		if (control->threads > 1)
979 			--control->threads;
980 		else
981 			break;
982 		limit = (control->usable_ram - (control->overhead * control->threads)) / testbufs;
983 		limit = MIN(limit, chunk_limit);
984 	}
985 	if (BITS32) {
986 		limit = MIN(limit, one_g);
987 		if (limit + (control->overhead * control->threads) > one_g)
988 			limit = one_g - (control->overhead * control->threads);
989 	}
990 	/* Use a nominal minimum size should we fail all previous shrinking */
991 	limit = MAX(limit, STREAM_BUFSIZE);
992 	limit = MIN(limit, chunk_limit);
993 retest_malloc:
994 	testsize = limit + (control->overhead * control->threads);
995 	testmalloc = malloc(testsize);
996 	if (!testmalloc) {
997 		limit = limit / 10 * 9;
998 		goto retest_malloc;
999 	}
1000 	if (!NO_COMPRESS) {
1001 		char *testmalloc2 = malloc(limit);
1002 
1003 		if (!testmalloc2) {
1004 			free(testmalloc);
1005 			limit = limit / 10 * 9;
1006 			goto retest_malloc;
1007 		}
1008 		free(testmalloc2);
1009 	}
1010 	free(testmalloc);
1011 	print_maxverbose("Succeeded in testing %lld sized malloc for back end compression\n", testsize);
1012 
1013 	/* Make the bufsize no smaller than STREAM_BUFSIZE. Round up the
1014 	 * bufsize to fit X threads into it */
1015 	sinfo->bufsize = MIN(limit, MAX((limit + control->threads - 1) / control->threads,
1016 					STREAM_BUFSIZE));
1017 
1018 	if (control->threads > 1)
1019 		print_maxverbose("Using up to %d threads to compress up to %lld bytes each.\n",
1020 			control->threads, sinfo->bufsize);
1021 	else
1022 		print_maxverbose("Using only 1 thread to compress up to %lld bytes\n",
1023 			sinfo->bufsize);
1024 
1025 	for (i = 0; i < n; i++) {
1026 		sinfo->s[i].buf = calloc(sinfo->bufsize , 1);
1027 		if (unlikely(!sinfo->s[i].buf)) {
1028 			fatal("Unable to malloc buffer of size %lld in open_stream_out\n", sinfo->bufsize);
1029 			free(sinfo->s);
1030 			free(sinfo);
1031 			return NULL;
1032 		}
1033 	}
1034 
1035 	return (void *)sinfo;
1036 }
1037 
1038 /* The block headers are all encrypted so we read the data and salt associated
1039  * with them, decrypt the data, then return the decrypted version of the
1040  * values */
decrypt_header(rzip_control * control,uchar * head,uchar * c_type,i64 * c_len,i64 * u_len,i64 * last_head)1041 static bool decrypt_header(rzip_control *control, uchar *head, uchar *c_type,
1042 			   i64 *c_len, i64 *u_len, i64 *last_head)
1043 {
1044 	uchar *buf = head + SALT_LEN;
1045 
1046 	memcpy(buf, c_type, 1);
1047 	memcpy(buf + 1, c_len, 8);
1048 	memcpy(buf + 9, u_len, 8);
1049 	memcpy(buf + 17, last_head, 8);
1050 
1051 	if (unlikely(!lrz_decrypt(control, buf, 25, head)))
1052 		return false;
1053 
1054 	memcpy(c_type, buf, 1);
1055 	memcpy(c_len, buf + 1, 8);
1056 	memcpy(u_len, buf + 9, 8);
1057 	memcpy(last_head, buf + 17, 8);
1058 	return true;
1059 }
1060 
1061 /* prepare a set of n streams for reading on file descriptor f */
open_stream_in(rzip_control * control,int f,int n,char chunk_bytes)1062 void *open_stream_in(rzip_control *control, int f, int n, char chunk_bytes)
1063 {
1064 	struct stream_info *sinfo;
1065 	int total_threads, i;
1066 	i64 header_length;
1067 
1068 	sinfo = calloc(sizeof(struct stream_info), 1);
1069 	if (unlikely(!sinfo))
1070 		return NULL;
1071 
1072 	/* We have one thread dedicated to stream 0, and one more thread than
1073 	 * CPUs to keep them busy, unless we're running single-threaded. */
1074 	if (control->threads > 1)
1075 		total_threads = control->threads + 2;
1076 	else
1077 		total_threads = control->threads + 1;
1078 	threads = calloc(sizeof(pthread_t), total_threads);
1079 	if (unlikely(!threads))
1080 		return NULL;
1081 
1082 	ucthread = calloc(sizeof(struct uncomp_thread), total_threads);
1083 	if (unlikely(!ucthread)) {
1084 		free(sinfo);
1085 		free(threads);
1086 		fatal_return(("Unable to calloc cthread in open_stream_in\n"), NULL);
1087 	}
1088 
1089 	sinfo->num_streams = n;
1090 	sinfo->fd = f;
1091 	sinfo->chunk_bytes = chunk_bytes;
1092 
1093 	sinfo->s = calloc(sizeof(struct stream), n);
1094 	if (unlikely(!sinfo->s)) {
1095 		free(sinfo);
1096 		return NULL;
1097 	}
1098 
1099 	sinfo->s[0].total_threads = 1;
1100 	sinfo->s[1].total_threads = total_threads - 1;
1101 
1102 	if (control->major_version == 0 && control->minor_version > 5) {
1103 		/* Read in flag that tells us if there are more chunks after
1104 		 * this. Ignored if we know the final file size */
1105 		print_maxverbose("Reading eof flag at %lld\n", get_readseek(control, f));
1106 		if (unlikely(read_u8(control, f, &control->eof))) {
1107 			print_err("Failed to read eof flag in open_stream_in\n");
1108 			goto failed;
1109 		}
1110 		print_maxverbose("EOF: %d\n", control->eof);
1111 
1112 		/* Read in the expected chunk size */
1113 		if (!ENCRYPT) {
1114 			print_maxverbose("Reading expected chunksize at %lld\n", get_readseek(control, f));
1115 			if (unlikely(read_val(control, f, &sinfo->size, sinfo->chunk_bytes))) {
1116 				print_err("Failed to read in chunk size in open_stream_in\n");
1117 				goto failed;
1118 			}
1119 			sinfo->size = le64toh(sinfo->size);
1120 			print_maxverbose("Chunk size: %lld\n", sinfo->size);
1121 			control->st_size += sinfo->size;
1122 		}
1123 	}
1124 	sinfo->initial_pos = get_readseek(control, f);
1125 	if (unlikely(sinfo->initial_pos == -1))
1126 		goto failed;
1127 
1128 	for (i = 0; i < n; i++) {
1129 		uchar c, enc_head[25 + SALT_LEN];
1130 		i64 v1, v2;
1131 
1132 		sinfo->s[i].base_thread = i;
1133 		sinfo->s[i].uthread_no = sinfo->s[i].base_thread;
1134 		sinfo->s[i].unext_thread = sinfo->s[i].base_thread;
1135 
1136 		if (unlikely(ENCRYPT && read_buf(control, f, enc_head, SALT_LEN)))
1137 			goto failed;
1138 again:
1139 		if (unlikely(read_u8(control, f, &c)))
1140 			goto failed;
1141 
1142 		/* Compatibility crap for versions < 0.40 */
1143 		if (control->major_version == 0 && control->minor_version < 4) {
1144 			u32 v132, v232, last_head32;
1145 
1146 			if (unlikely(read_u32(control, f, &v132)))
1147 				goto failed;
1148 			if (unlikely(read_u32(control, f, &v232)))
1149 				goto failed;
1150 			if (unlikely(read_u32(control, f, &last_head32)))
1151 				goto failed;
1152 
1153 			v1 = v132;
1154 			v2 = v232;
1155 			sinfo->s[i].last_head = last_head32;
1156 			header_length = 13;
1157 		} else {
1158 			int read_len;
1159 
1160 			print_maxverbose("Reading stream %d header at %lld\n", i, get_readseek(control, f));
1161 			if ((control->major_version == 0 && control->minor_version < 6) ||
1162 				ENCRYPT)
1163 					read_len = 8;
1164 			else
1165 				read_len = sinfo->chunk_bytes;
1166 			if (unlikely(read_val(control, f, &v1, read_len)))
1167 				goto failed;
1168 			if (unlikely(read_val(control, f, &v2, read_len)))
1169 				goto failed;
1170 			if (unlikely(read_val(control, f, &sinfo->s[i].last_head, read_len)))
1171 				goto failed;
1172 			header_length = 1 + (read_len * 3);
1173 		}
1174 		sinfo->total_read += header_length;
1175 
1176 		if (ENCRYPT) {
1177 			if (unlikely(!decrypt_header(control, enc_head, &c, &v1, &v2, &sinfo->s[i].last_head)))
1178 				goto failed;
1179 			sinfo->total_read += SALT_LEN;
1180 		}
1181 
1182 		v1 = le64toh(v1);
1183 		v2 = le64toh(v2);
1184 		sinfo->s[i].last_head = le64toh(sinfo->s[i].last_head);
1185 
1186 		if (unlikely(c == CTYPE_NONE && v1 == 0 && v2 == 0 && sinfo->s[i].last_head == 0 && i == 0)) {
1187 			print_err("Enabling stream close workaround\n");
1188 			sinfo->initial_pos += header_length;
1189 			goto again;
1190 		}
1191 
1192 		if (unlikely(c != CTYPE_NONE)) {
1193 			print_err("Unexpected initial tag %d in streams\n", c);
1194 			if (ENCRYPT)
1195 				print_err("Wrong password?\n");
1196 			goto failed;
1197 		}
1198 		if (unlikely(v1)) {
1199 			print_err("Unexpected initial c_len %lld in streams %lld\n", v1, v2);
1200 			goto failed;
1201 		}
1202 		if (unlikely(v2)) {
1203 			print_err("Unexpected initial u_len %lld in streams\n", v2);
1204 			goto failed;
1205 		}
1206 	}
1207 
1208 	return (void *)sinfo;
1209 
1210 failed:
1211 	free(sinfo->s);
1212 	free(sinfo);
1213 	return NULL;
1214 }
1215 
1216 #define MIN_SIZE (ENCRYPT ? CBC_LEN : 0)
1217 
1218 /* Once the final data has all been written to the block header, we go back
1219  * and write SALT_LEN bytes of salt before it, and encrypt the header in place
1220  * by reading what has been written, encrypting it, and writing back over it.
1221  * This is very convoluted depending on whether a last_head value is written
1222  * to this block or not. See the callers of this function */
rewrite_encrypted(rzip_control * control,struct stream_info * sinfo,i64 ofs)1223 static bool rewrite_encrypted(rzip_control *control, struct stream_info *sinfo, i64 ofs)
1224 {
1225 	uchar *buf, *head;
1226 	i64 cur_ofs;
1227 
1228 	cur_ofs = get_seek(control, sinfo->fd) - sinfo->initial_pos;
1229 	if (unlikely(cur_ofs == -1))
1230 		return false;
1231 	head = malloc(25 + SALT_LEN);
1232 	if (unlikely(!head))
1233 		fatal_return(("Failed to malloc head in rewrite_encrypted\n"), false);
1234 	buf = head + SALT_LEN;
1235 	if (unlikely(!get_rand(control, head, SALT_LEN)))
1236 		goto error;
1237 	if (unlikely(seekto(control, sinfo, ofs - SALT_LEN)))
1238 		failure_goto(("Failed to seekto buf ofs in rewrite_encrypted\n"), error);
1239 	if (unlikely(write_buf(control, head, SALT_LEN)))
1240 		failure_goto(("Failed to write_buf head in rewrite_encrypted\n"), error);
1241 	if (unlikely(read_buf(control, sinfo->fd, buf, 25)))
1242 		failure_goto(("Failed to read_buf buf in rewrite_encrypted\n"), error);
1243 
1244 	if (unlikely(!lrz_encrypt(control, buf, 25, head)))
1245 		goto error;
1246 
1247 	if (unlikely(seekto(control, sinfo, ofs)))
1248 		failure_goto(("Failed to seek back to ofs in rewrite_encrypted\n"), error);
1249 	if (unlikely(write_buf(control, buf, 25)))
1250 		failure_goto(("Failed to write_buf encrypted buf in rewrite_encrypted\n"), error);
1251 	free(head);
1252 	seekto(control, sinfo, cur_ofs);
1253 	return true;
1254 error:
1255 	free(head);
1256 	return false;
1257 }
1258 
1259 /* Enter with s_buf allocated,s_buf points to the compressed data after the
1260  * backend compression and is then freed here */
compthread(void * data)1261 static void *compthread(void *data)
1262 {
1263 	stream_thread_struct *s = data;
1264 	rzip_control *control = s->control;
1265 	long i = s->i;
1266 	struct compress_thread *cti;
1267 	struct stream_info *ctis;
1268 	int waited = 0, ret = 0;
1269 	i64 padded_len;
1270 	int write_len;
1271 
1272 	/* Make sure this thread doesn't already exist */
1273 
1274 	free(data);
1275 	cti = &cthread[i];
1276 	ctis = cti->sinfo;
1277 
1278 	if (unlikely(setpriority(PRIO_PROCESS, 0, control->nice_val) == -1))
1279 		print_err("Warning, unable to set nice value on thread\n");
1280 
1281 	cti->c_type = CTYPE_NONE;
1282 	cti->c_len = cti->s_len;
1283 
1284 	/* Flushing writes to disk frees up any dirty ram, improving chances
1285 	 * of succeeding in allocating more ram */
1286 	fsync(ctis->fd);
1287 retry:
1288 	/* Very small buffers have issues to do with minimum amounts of ram
1289 	 * allocatable to a buffer combined with the MINIMUM_MATCH of rzip
1290 	 * being 31 bytes so don't bother trying to compress anything less
1291 	 * than 64 bytes. */
1292 	if (!NO_COMPRESS && cti->c_len >= 64) {
1293 		if (LZMA_COMPRESS)
1294 			ret = lzma_compress_buf(control, cti);
1295 		else if (LZO_COMPRESS)
1296 			ret = lzo_compress_buf(control, cti);
1297 		else if (BZIP2_COMPRESS)
1298 			ret = bzip2_compress_buf(control, cti);
1299 		else if (ZLIB_COMPRESS)
1300 			ret = gzip_compress_buf(control, cti);
1301 		else if (ZPAQ_COMPRESS)
1302 			ret = zpaq_compress_buf(control, cti, i);
1303 		else failure_goto(("Dunno wtf compression to use!\n"), error);
1304 	}
1305 
1306 	padded_len = cti->c_len;
1307 	if (!ret && padded_len < MIN_SIZE) {
1308 		/* We need to pad out each block to at least be CBC_LEN bytes
1309 		 * long or encryption cannot work. We pad it with random
1310 		 * data */
1311 		padded_len = MIN_SIZE;
1312 		cti->s_buf = realloc(cti->s_buf, MIN_SIZE);
1313 		if (unlikely(!cti->s_buf))
1314 			fatal_goto(("Failed to realloc s_buf in compthread\n"), error);
1315 		if (unlikely(!get_rand(control, cti->s_buf + cti->c_len, MIN_SIZE - cti->c_len)))
1316 			goto error;
1317 	}
1318 
1319 	/* If compression fails for whatever reason multithreaded, then wait
1320 	 * for the previous thread to finish, serialising the work to decrease
1321 	 * the memory requirements, increasing the chance of success */
1322 	if (unlikely(ret && waited))
1323 		failure_goto(("Failed to compress in compthread\n"), error);
1324 
1325 	if (!waited) {
1326 		lock_mutex(control, &output_lock);
1327 		while (output_thread != i)
1328 			cond_wait(control, &output_cond, &output_lock);
1329 		unlock_mutex(control, &output_lock);
1330 		waited = 1;
1331 	}
1332 	if (unlikely(ret)) {
1333 		print_maxverbose("Unable to compress in parallel, waiting for previous thread to complete before trying again\n");
1334 		goto retry;
1335 	}
1336 
1337 	/* Need to be big enough to fill one CBC_LEN */
1338 	if (ENCRYPT)
1339 		write_len = 8;
1340 	else
1341 		write_len = ctis->chunk_bytes;
1342 
1343 	if (!ctis->chunks++) {
1344 		int j;
1345 
1346 		if (TMP_OUTBUF) {
1347 			lock_mutex(control, &control->control_lock);
1348 			if (!control->magic_written)
1349 				write_magic(control);
1350 			unlock_mutex(control, &control->control_lock);
1351 
1352 			if (unlikely(!flush_tmpoutbuf(control))) {
1353 				print_err("Failed to flush_tmpoutbuf in compthread\n");
1354 				goto error;
1355 			}
1356 		}
1357 
1358 		print_maxverbose("Writing initial chunk bytes value %d at %lld\n",
1359 				 ctis->chunk_bytes, get_seek(control, ctis->fd));
1360 		/* Write chunk bytes of this block */
1361 		write_u8(control, ctis->chunk_bytes);
1362 
1363 		/* Write whether this is the last chunk, followed by the size
1364 		 * of this chunk */
1365 		print_maxverbose("Writing EOF flag as %d\n", control->eof);
1366 		write_u8(control, control->eof);
1367 		if (!ENCRYPT)
1368 			write_val(control, ctis->size, ctis->chunk_bytes);
1369 
1370 		/* First chunk of this stream, write headers */
1371 		ctis->initial_pos = get_seek(control, ctis->fd);
1372 		if (unlikely(ctis->initial_pos == -1))
1373 			goto error;
1374 
1375 		print_maxverbose("Writing initial header at %lld\n", ctis->initial_pos);
1376 		for (j = 0; j < ctis->num_streams; j++) {
1377 			/* If encrypting, we leave SALT_LEN room to write in salt
1378 			* later */
1379 			if (ENCRYPT) {
1380 				if (unlikely(write_val(control, 0, SALT_LEN)))
1381 					fatal_goto(("Failed to write_buf blank salt in compthread %d\n", i), error);
1382 				ctis->cur_pos += SALT_LEN;
1383 			}
1384 			ctis->s[j].last_head = ctis->cur_pos + 1 + (write_len * 2);
1385 			write_u8(control, CTYPE_NONE);
1386 			write_val(control, 0, write_len);
1387 			write_val(control, 0, write_len);
1388 			write_val(control, 0, write_len);
1389 			ctis->cur_pos += 1 + (write_len * 3);
1390 		}
1391 	}
1392 
1393 	print_maxverbose("Compthread %ld seeking to %lld to store length %d\n", i, ctis->s[cti->streamno].last_head, write_len);
1394 
1395 	if (unlikely(seekto(control, ctis, ctis->s[cti->streamno].last_head)))
1396 		fatal_goto(("Failed to seekto in compthread %d\n", i), error);
1397 
1398 	if (unlikely(write_val(control, ctis->cur_pos, write_len)))
1399 		fatal_goto(("Failed to write_val cur_pos in compthread %d\n", i), error);
1400 
1401 	if (ENCRYPT)
1402 		rewrite_encrypted(control, ctis, ctis->s[cti->streamno].last_head - 17);
1403 
1404 	ctis->s[cti->streamno].last_head = ctis->cur_pos + 1 + (write_len * 2) + (ENCRYPT ? SALT_LEN : 0);
1405 
1406 	print_maxverbose("Compthread %ld seeking to %lld to write header\n", i, ctis->cur_pos);
1407 
1408 	if (unlikely(seekto(control, ctis, ctis->cur_pos)))
1409 		fatal_goto(("Failed to seekto cur_pos in compthread %d\n", i), error);
1410 
1411 	print_maxverbose("Thread %ld writing %lld compressed bytes from stream %d\n", i, padded_len, cti->streamno);
1412 
1413 	if (ENCRYPT) {
1414 		if (unlikely(write_val(control, 0, SALT_LEN)))
1415 			fatal_goto(("Failed to write_buf header salt in compthread %d\n", i), error);
1416 		ctis->cur_pos += SALT_LEN;
1417 		ctis->s[cti->streamno].last_headofs = ctis->cur_pos;
1418 	}
1419 	/* We store the actual c_len even though we might pad it out */
1420 	if (unlikely(write_u8(control, cti->c_type) ||
1421 		write_val(control, cti->c_len, write_len) ||
1422 		write_val(control, cti->s_len, write_len) ||
1423 		write_val(control, 0, write_len))) {
1424 			fatal_goto(("Failed write in compthread %d\n", i), error);
1425 	}
1426 	ctis->cur_pos += 1 + (write_len * 3);
1427 
1428 	if (ENCRYPT) {
1429 		if (unlikely(!get_rand(control, cti->salt, SALT_LEN)))
1430 			goto error;
1431 		if (unlikely(write_buf(control, cti->salt, SALT_LEN)))
1432 			fatal_goto(("Failed to write_buf block salt in compthread %d\n", i), error);
1433 		if (unlikely(!lrz_encrypt(control, cti->s_buf, padded_len, cti->salt)))
1434 			goto error;
1435 		ctis->cur_pos += SALT_LEN;
1436 	}
1437 
1438 	print_maxverbose("Compthread %ld writing data at %lld\n", i, ctis->cur_pos);
1439 
1440 	if (unlikely(write_buf(control, cti->s_buf, padded_len)))
1441 		fatal_goto(("Failed to write_buf s_buf in compthread %d\n", i), error);
1442 
1443 	ctis->cur_pos += padded_len;
1444 	free(cti->s_buf);
1445 
1446 	lock_mutex(control, &output_lock);
1447 	if (++output_thread == control->threads)
1448 		output_thread = 0;
1449 	cond_broadcast(control, &output_cond);
1450 	unlock_mutex(control, &output_lock);
1451 
1452 error:
1453 	cksem_post(control, &cti->cksem);
1454 
1455 	return NULL;
1456 }
1457 
clear_buffer(rzip_control * control,struct stream_info * sinfo,int streamno,int newbuf)1458 static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int streamno, int newbuf)
1459 {
1460 	stream_thread_struct *s;
1461 	static int i = 0;
1462 
1463 	/* Make sure this thread doesn't already exist */
1464 	cksem_wait(control, &cthread[i].cksem);
1465 
1466 	cthread[i].sinfo = sinfo;
1467 	cthread[i].streamno = streamno;
1468 	cthread[i].s_buf = sinfo->s[streamno].buf;
1469 	cthread[i].s_len = sinfo->s[streamno].buflen;
1470 
1471 	print_maxverbose("Starting thread %ld to compress %lld bytes from stream %d\n",
1472 			 i, cthread[i].s_len, streamno);
1473 
1474 	s = malloc(sizeof(stream_thread_struct));
1475 	if (unlikely(!s)) {
1476 		cksem_post(control, &cthread[i].cksem);
1477 		failure("Unable to malloc in clear_buffer");
1478 	}
1479 	s->i = i;
1480 	s->control = control;
1481 	if (unlikely((!create_pthread(control, &threads[i], NULL, compthread, s)) ||
1482 	             (!detach_pthread(control, &threads[i]))))
1483 		failure("Unable to create compthread in clear_buffer");
1484 
1485 	if (newbuf) {
1486 		/* The stream buffer has been given to the thread, allocate a
1487 		 * new one. */
1488 		sinfo->s[streamno].buf = malloc(sinfo->bufsize);
1489 		if (unlikely(!sinfo->s[streamno].buf))
1490 			failure("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize);
1491 		sinfo->s[streamno].buflen = 0;
1492 	}
1493 
1494 	if (++i == control->threads)
1495 		i = 0;
1496 }
1497 
1498 /* flush out any data in a stream buffer */
flush_buffer(rzip_control * control,struct stream_info * sinfo,int streamno)1499 void flush_buffer(rzip_control *control, struct stream_info *sinfo, int streamno)
1500 {
1501 	clear_buffer(control, sinfo, streamno, 1);
1502 }
1503 
ucompthread(void * data)1504 static void *ucompthread(void *data)
1505 {
1506 	stream_thread_struct *s = data;
1507 	rzip_control *control = s->control;
1508 	long i = s->i;
1509 	struct uncomp_thread *uci;
1510 	int waited = 0, ret = 0;
1511 
1512 	free(data);
1513 	uci = &ucthread[i];
1514 
1515 	if (unlikely(setpriority(PRIO_PROCESS, 0, control->nice_val) == -1))
1516 		print_err("Warning, unable to set nice value on thread\n");
1517 
1518 retry:
1519 	if (uci->c_type != CTYPE_NONE) {
1520 		switch (uci->c_type) {
1521 			case CTYPE_LZMA:
1522 				ret = lzma_decompress_buf(control, uci);
1523 				break;
1524 			case CTYPE_LZO:
1525 				ret = lzo_decompress_buf(control, uci);
1526 				break;
1527 			case CTYPE_BZIP2:
1528 				ret = bzip2_decompress_buf(control, uci);
1529 				break;
1530 			case CTYPE_GZIP:
1531 				ret = gzip_decompress_buf(control, uci);
1532 				break;
1533 			case CTYPE_ZPAQ:
1534 				ret = zpaq_decompress_buf(control, uci, i);
1535 				break;
1536 			default:
1537 				failure_return(("Dunno wtf decompression type to use!\n"), NULL);
1538 				break;
1539 		}
1540 	}
1541 
1542 	/* As per compression, serialise the decompression if it fails in
1543 	 * parallel */
1544 	if (unlikely(ret)) {
1545 		if (unlikely(waited))
1546 			failure_return(("Failed to decompress in ucompthread\n"), NULL);
1547 		print_maxverbose("Unable to decompress in parallel, waiting for previous thread to complete before trying again\n");
1548 		/* We do not strictly need to wait for this, so it's used when
1549 		 * decompression fails due to inadequate memory to try again
1550 		 * serialised. */
1551 		lock_mutex(control, &output_lock);
1552 		while (output_thread != i)
1553 			cond_wait(control, &output_cond, &output_lock);
1554 		unlock_mutex(control, &output_lock);
1555 		waited = 1;
1556 		goto retry;
1557 	}
1558 
1559 	print_maxverbose("Thread %ld decompressed %lld bytes from stream %d\n", i, uci->u_len, uci->streamno);
1560 
1561 	return 0;
1562 }
1563 
1564 /* fill a buffer from a stream - return -1 on failure */
fill_buffer(rzip_control * control,struct stream_info * sinfo,int streamno)1565 static int fill_buffer(rzip_control *control, struct stream_info *sinfo, int streamno)
1566 {
1567 	i64 u_len, c_len, last_head, padded_len, header_length;
1568 	uchar enc_head[25 + SALT_LEN], blocksalt[SALT_LEN];
1569 	struct stream *s = &sinfo->s[streamno];
1570 	stream_thread_struct *st;
1571 	uchar c_type, *s_buf;
1572 
1573 	if (s->buf)
1574 		free(s->buf);
1575 	if (s->eos)
1576 		goto out;
1577 fill_another:
1578 	if (unlikely(ucthread[s->uthread_no].busy))
1579 		failure_return(("Trying to start a busy thread, this shouldn't happen!\n"), -1);
1580 
1581 	if (unlikely(read_seekto(control, sinfo, s->last_head)))
1582 		return -1;
1583 
1584 	if (ENCRYPT) {
1585 		if (unlikely(read_buf(control, sinfo->fd, enc_head, SALT_LEN)))
1586 			return -1;
1587 		sinfo->total_read += SALT_LEN;
1588 	}
1589 
1590 	if (unlikely(read_u8(control, sinfo->fd, &c_type)))
1591 		return -1;
1592 
1593 	/* Compatibility crap for versions < 0.4 */
1594 	if (control->major_version == 0 && control->minor_version < 4) {
1595 		u32 c_len32, u_len32, last_head32;
1596 
1597 		if (unlikely(read_u32(control, sinfo->fd, &c_len32)))
1598 			return -1;
1599 		if (unlikely(read_u32(control, sinfo->fd, &u_len32)))
1600 			return -1;
1601 		if (unlikely(read_u32(control, sinfo->fd, &last_head32)))
1602 			return -1;
1603 		c_len = c_len32;
1604 		u_len = u_len32;
1605 		last_head = last_head32;
1606 		header_length = 13;
1607 	} else {
1608 		int read_len;
1609 
1610 		print_maxverbose("Reading ucomp header at %lld\n", get_readseek(control, sinfo->fd));
1611 		if ((control->major_version == 0 && control->minor_version < 6) || ENCRYPT)
1612 			read_len = 8;
1613 		else
1614 			read_len = sinfo->chunk_bytes;
1615 		if (unlikely(read_val(control, sinfo->fd, &c_len, read_len)))
1616 			return -1;
1617 		if (unlikely(read_val(control, sinfo->fd, &u_len, read_len)))
1618 			return -1;
1619 		if (unlikely(read_val(control, sinfo->fd, &last_head, read_len)))
1620 			return -1;
1621 		header_length = 1 + (read_len * 3);
1622 	}
1623 	sinfo->total_read += header_length;
1624 
1625 	if (ENCRYPT) {
1626 		if (unlikely(!decrypt_header(control, enc_head, &c_type, &c_len, &u_len, &last_head)))
1627 			return -1;
1628 		if (unlikely(read_buf(control, sinfo->fd, blocksalt, SALT_LEN)))
1629 			return -1;
1630 		sinfo->total_read += SALT_LEN;
1631 	}
1632 	c_len = le64toh(c_len);
1633 	u_len = le64toh(u_len);
1634 	last_head = le64toh(last_head);
1635 	print_maxverbose("Fill_buffer stream %d c_len %lld u_len %lld last_head %lld\n", streamno, c_len, u_len, last_head);
1636 
1637 	padded_len = MAX(c_len, MIN_SIZE);
1638 	sinfo->total_read += padded_len;
1639 	fsync(control->fd_out);
1640 
1641 	if (unlikely(u_len > control->maxram))
1642 		fatal_return(("Unable to malloc buffer of size %lld in this environment\n", u_len), -1);
1643 	s_buf = malloc(MAX(u_len, MIN_SIZE));
1644 	if (unlikely(u_len && !s_buf))
1645 		fatal_return(("Unable to malloc buffer of size %lld in fill_buffer\n", u_len), -1);
1646 	sinfo->ram_alloced += u_len;
1647 
1648 	if (unlikely(read_buf(control, sinfo->fd, s_buf, padded_len)))
1649 		return -1;
1650 
1651 	if (ENCRYPT) {
1652 		if (unlikely(!lrz_decrypt(control, s_buf, padded_len, blocksalt)))
1653 			return -1;
1654 	}
1655 
1656 	ucthread[s->uthread_no].s_buf = s_buf;
1657 	ucthread[s->uthread_no].c_len = c_len;
1658 	ucthread[s->uthread_no].u_len = u_len;
1659 	ucthread[s->uthread_no].c_type = c_type;
1660 	ucthread[s->uthread_no].streamno = streamno;
1661 	s->last_head = last_head;
1662 
1663 	/* List this thread as busy */
1664 	ucthread[s->uthread_no].busy = 1;
1665 	print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n",
1666 			 s->uthread_no, padded_len, streamno);
1667 
1668 	st = malloc(sizeof(stream_thread_struct));
1669 	if (unlikely(!st))
1670 		fatal_return(("Unable to malloc in fill_buffer"), -1);
1671 	st->i = s->uthread_no;
1672 	st->control = control;
1673 	if (unlikely(!create_pthread(control, &threads[s->uthread_no], NULL, ucompthread, st))) {
1674 		free(st);
1675 		return -1;
1676 	}
1677 
1678 	if (++s->uthread_no == s->base_thread + s->total_threads)
1679 		s->uthread_no = s->base_thread;
1680 
1681 	/* Reached the end of this stream, no more data to read in, otherwise
1682 	 * see if the next thread is free to grab more data. We also check that
1683 	 * we're not going to be allocating too much ram to generate all these
1684 	 * threads. */
1685 	if (!last_head)
1686 		s->eos = 1;
1687 	else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy &&
1688 		 sinfo->ram_alloced < control->maxram)
1689 			goto fill_another;
1690 out:
1691 	lock_mutex(control, &output_lock);
1692 	output_thread = s->unext_thread;
1693 	cond_broadcast(control, &output_cond);
1694 	unlock_mutex(control, &output_lock);
1695 
1696 	/* join_pthread here will make it wait till the data is ready */
1697 	if (unlikely(!join_pthread(control, threads[s->unext_thread], NULL)))
1698 		return -1;
1699 	ucthread[s->unext_thread].busy = 0;
1700 
1701 	print_maxverbose("Taking decompressed data from thread %ld\n", s->unext_thread);
1702 	s->buf = ucthread[s->unext_thread].s_buf;
1703 	s->buflen = ucthread[s->unext_thread].u_len;
1704 	sinfo->ram_alloced -= s->buflen;
1705 	s->bufp = 0;
1706 
1707 	if (++s->unext_thread == s->base_thread + s->total_threads)
1708 		s->unext_thread = s->base_thread;
1709 
1710 	return 0;
1711 }
1712 
1713 /* write some data to a stream. Return -1 on failure */
write_stream(rzip_control * control,void * ss,int streamno,uchar * p,i64 len)1714 void write_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len)
1715 {
1716 	struct stream_info *sinfo = ss;
1717 
1718 	while (len) {
1719 		i64 n;
1720 
1721 		n = MIN(sinfo->bufsize - sinfo->s[streamno].buflen, len);
1722 
1723 		memcpy(sinfo->s[streamno].buf + sinfo->s[streamno].buflen, p, n);
1724 		sinfo->s[streamno].buflen += n;
1725 		p += n;
1726 		len -= n;
1727 
1728 		/* Flush the buffer every sinfo->bufsize into one thread */
1729 		if (sinfo->s[streamno].buflen == sinfo->bufsize)
1730 			flush_buffer(control, sinfo, streamno);
1731 	}
1732 }
1733 
1734 /* read some data from a stream. Return number of bytes read, or -1
1735    on failure */
read_stream(rzip_control * control,void * ss,int streamno,uchar * p,i64 len)1736 i64 read_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len)
1737 {
1738 	struct stream_info *sinfo = ss;
1739 	i64 ret = 0;
1740 
1741 	while (len) {
1742 		i64 n;
1743 
1744 		n = MIN(sinfo->s[streamno].buflen - sinfo->s[streamno].bufp, len);
1745 
1746 		if (n > 0) {
1747 			memcpy(p, sinfo->s[streamno].buf + sinfo->s[streamno].bufp, n);
1748 			sinfo->s[streamno].bufp += n;
1749 			p += n;
1750 			len -= n;
1751 			ret += n;
1752 		}
1753 
1754 		if (len && sinfo->s[streamno].bufp == sinfo->s[streamno].buflen) {
1755 			if (unlikely(fill_buffer(control, sinfo, streamno)))
1756 				return -1;
1757 			if (sinfo->s[streamno].bufp == sinfo->s[streamno].buflen)
1758 				break;
1759 		}
1760 	}
1761 
1762 	return ret;
1763 }
1764 
1765 /* flush and close down a stream. return -1 on failure */
close_stream_out(rzip_control * control,void * ss)1766 int close_stream_out(rzip_control *control, void *ss)
1767 {
1768 	struct stream_info *sinfo = ss;
1769 	int i;
1770 
1771 	for (i = 0; i < sinfo->num_streams; i++)
1772 		clear_buffer(control, sinfo, i, 0);
1773 
1774 	if (ENCRYPT) {
1775 		/* Last two compressed blocks do not have an offset written
1776 		 * to them so we have to go back and encrypt them now, but we
1777 		 * must wait till the threads return. */
1778 		int close_thread = output_thread;
1779 
1780 		for (i = 0; i < control->threads; i++) {
1781 			cksem_wait(control, &cthread[close_thread].cksem);
1782 			cksem_post(control, &cthread[close_thread].cksem);
1783 			if (++close_thread == control->threads)
1784 				close_thread = 0;
1785 		}
1786 		for (i = 0; i < sinfo->num_streams; i++)
1787 			rewrite_encrypted(control, sinfo, sinfo->s[i].last_headofs);
1788 	}
1789 	if (control->library_mode) {
1790 		if (!control->sinfo_buckets) {
1791 			/* no streams added */
1792 			control->sinfo_queue = calloc(STREAM_BUCKET_SIZE + 1, sizeof(void*));
1793 			if (!control->sinfo_queue) {
1794 				print_err("Failed to calloc sinfo_queue in close_stream_out\n");
1795 				return -1;
1796 			}
1797 			control->sinfo_buckets++;
1798 		} else if (control->sinfo_idx == STREAM_BUCKET_SIZE * control->sinfo_buckets + 1) {
1799 			/* all buckets full, create new bucket */
1800 			void *tmp;
1801 
1802 			tmp = realloc(control->sinfo_queue, (++control->sinfo_buckets * STREAM_BUCKET_SIZE + 1) * sizeof(void*));
1803 			if (!tmp) {
1804 				print_err("Failed to realloc sinfo_queue in close_stream_out\n");
1805 				return -1;
1806 			}
1807 			control->sinfo_queue = tmp;
1808 			memset(control->sinfo_queue + control->sinfo_idx, 0, ((control->sinfo_buckets * STREAM_BUCKET_SIZE + 1) - control->sinfo_idx) * sizeof(void*));
1809 		}
1810 		control->sinfo_queue[control->sinfo_idx++] = sinfo;
1811 	}
1812 #if 0
1813 	/* These cannot be freed immediately because their values are read after the next
1814 	 * stream has started. Instead (in library mode), they are stored and only freed
1815 	 * after the entire operation has completed.
1816 	 */
1817 	free(sinfo->s);
1818 	free(sinfo);
1819 #endif
1820 	return 0;
1821 }
1822 
1823 /* close down an input stream */
close_stream_in(rzip_control * control,void * ss)1824 int close_stream_in(rzip_control *control, void *ss)
1825 {
1826 	struct stream_info *sinfo = ss;
1827 	int i;
1828 
1829 	print_maxverbose("Closing stream at %lld, want to seek to %lld\n",
1830 			 get_readseek(control, control->fd_in),
1831 			 sinfo->initial_pos + sinfo->total_read);
1832 	if (unlikely(read_seekto(control, sinfo, sinfo->total_read)))
1833 		return -1;
1834 
1835 	for (i = 0; i < sinfo->num_streams; i++)
1836 		free(sinfo->s[i].buf);
1837 
1838 	output_thread = 0;
1839 	free(ucthread);
1840 	free(threads);
1841 	free(sinfo->s);
1842 	free(sinfo);
1843 
1844 	return 0;
1845 }
1846 
1847 /* As others are slow and lzo very fast, it is worth doing a quick lzo pass
1848    to see if there is any compression at all with lzo first. It is unlikely
1849    that others will be able to compress if lzo is unable to drop a single byte
1850    so do not compress any block that is incompressible by lzo. */
lzo_compresses(rzip_control * control,uchar * s_buf,i64 s_len)1851 static int lzo_compresses(rzip_control *control, uchar *s_buf, i64 s_len)
1852 {
1853 	lzo_bytep wrkmem = NULL;
1854 	lzo_uint in_len, test_len = s_len, save_len = s_len;
1855 	lzo_uint dlen;
1856 	uchar *c_buf = NULL, *test_buf = s_buf;
1857 	/* set minimum buffer test size based on the length of the test stream */
1858 	unsigned long buftest_size = (test_len > 5 * STREAM_BUFSIZE ? STREAM_BUFSIZE : STREAM_BUFSIZE / 4096);
1859 	int ret = 0;
1860 	int workcounter = 0;	/* count # of passes */
1861 	lzo_uint best_dlen = UINT_MAX; /* save best compression estimate */
1862 
1863 	if (!LZO_TEST)
1864 		return 1;
1865 	wrkmem = (lzo_bytep) malloc(LZO1X_1_MEM_COMPRESS);
1866 	if (unlikely(wrkmem == NULL))
1867 		fatal_return(("Unable to allocate wrkmem in lzo_compresses\n"), 0);
1868 
1869 	in_len = MIN(test_len, buftest_size);
1870 	dlen = STREAM_BUFSIZE + STREAM_BUFSIZE / 16 + 64 + 3;
1871 
1872 	c_buf = malloc(dlen);
1873 	if (unlikely(!c_buf)) {
1874 		free(wrkmem);
1875 		fatal_return(("Unable to allocate c_buf in lzo_compresses\n"), 0);
1876 	}
1877 
1878 	/* Test progressively larger blocks at a time and as soon as anything
1879 	   compressible is found, jump out as a success */
1880 	while (test_len > 0) {
1881 		workcounter++;
1882 		lzo1x_1_compress(test_buf, in_len, (uchar *)c_buf, &dlen, wrkmem);
1883 
1884 		if (dlen < best_dlen)
1885 			best_dlen = dlen;	/* save best value */
1886 
1887 		if (dlen < in_len) {
1888 			ret = 1;
1889 			break;
1890 		}
1891 		/* expand and move buffer */
1892 		test_len -= in_len;
1893 		if (test_len) {
1894 			test_buf += (ptrdiff_t)in_len;
1895 			if (buftest_size < STREAM_BUFSIZE)
1896 				buftest_size <<= 1;
1897 			in_len = MIN(test_len, buftest_size);
1898 		}
1899 	}
1900 	print_maxverbose("lzo testing %s for chunk %ld. Compressed size = %5.2F%% of chunk, %d Passes\n",
1901 			(ret == 0? "FAILED" : "OK"), save_len,
1902 			100 * ((double) best_dlen / (double) in_len), workcounter);
1903 
1904 	free(wrkmem);
1905 	free(c_buf);
1906 
1907 	return ret;
1908 }
1909