1 // SPDX-License-Identifier: 0BSD
2 
3 ///////////////////////////////////////////////////////////////////////////////
4 //
5 /// \file       stream_encoder_mt.c
6 /// \brief      Multithreaded .xz Stream encoder
7 //
8 //  Author:     Lasse Collin
9 //
10 ///////////////////////////////////////////////////////////////////////////////
11 
12 #include "filter_encoder.h"
13 #include "easy_preset.h"
14 #include "block_encoder.h"
15 #include "block_buffer_encoder.h"
16 #include "index_encoder.h"
17 #include "outqueue.h"
18 
19 
20 /// Maximum supported block size. This makes it simpler to prevent integer
21 /// overflows if we are given unusually large block size.
22 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
23 
24 
25 typedef enum {
26 	/// Waiting for work.
27 	THR_IDLE,
28 
29 	/// Encoding is in progress.
30 	THR_RUN,
31 
32 	/// Encoding is in progress but no more input data will
33 	/// be read.
34 	THR_FINISH,
35 
36 	/// The main thread wants the thread to stop whatever it was doing
37 	/// but not exit.
38 	THR_STOP,
39 
40 	/// The main thread wants the thread to exit. We could use
41 	/// cancellation but since there's stopped anyway, this is lazier.
42 	THR_EXIT,
43 
44 } worker_state;
45 
46 typedef struct lzma_stream_coder_s lzma_stream_coder;
47 
48 typedef struct worker_thread_s worker_thread;
49 struct worker_thread_s {
50 	worker_state state;
51 
52 	/// Input buffer of coder->block_size bytes. The main thread will
53 	/// put new input into this and update in_size accordingly. Once
54 	/// no more input is coming, state will be set to THR_FINISH.
55 	uint8_t *in;
56 
57 	/// Amount of data available in the input buffer. This is modified
58 	/// only by the main thread.
59 	size_t in_size;
60 
61 	/// Output buffer for this thread. This is set by the main
62 	/// thread every time a new Block is started with this thread
63 	/// structure.
64 	lzma_outbuf *outbuf;
65 
66 	/// Pointer to the main structure is needed when putting this
67 	/// thread back to the stack of free threads.
68 	lzma_stream_coder *coder;
69 
70 	/// The allocator is set by the main thread. Since a copy of the
71 	/// pointer is kept here, the application must not change the
72 	/// allocator before calling lzma_end().
73 	const lzma_allocator *allocator;
74 
75 	/// Amount of uncompressed data that has already been compressed.
76 	uint64_t progress_in;
77 
78 	/// Amount of compressed data that is ready.
79 	uint64_t progress_out;
80 
81 	/// Block encoder
82 	lzma_next_coder block_encoder;
83 
84 	/// Compression options for this Block
85 	lzma_block block_options;
86 
87 	/// Filter chain for this thread. By copying the filters array
88 	/// to each thread it is possible to change the filter chain
89 	/// between Blocks using lzma_filters_update().
90 	lzma_filter filters[LZMA_FILTERS_MAX + 1];
91 
92 	/// Next structure in the stack of free worker threads.
93 	worker_thread *next;
94 
95 	mythread_mutex mutex;
96 	mythread_cond cond;
97 
98 	/// The ID of this thread is used to join the thread
99 	/// when it's not needed anymore.
100 	mythread thread_id;
101 };
102 
103 
104 struct lzma_stream_coder_s {
105 	enum {
106 		SEQ_STREAM_HEADER,
107 		SEQ_BLOCK,
108 		SEQ_INDEX,
109 		SEQ_STREAM_FOOTER,
110 	} sequence;
111 
112 	/// Start a new Block every block_size bytes of input unless
113 	/// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
114 	size_t block_size;
115 
116 	/// The filter chain to use for the next Block.
117 	/// This can be updated using lzma_filters_update()
118 	/// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
119 	lzma_filter filters[LZMA_FILTERS_MAX + 1];
120 
121 	/// A copy of filters[] will be put here when attempting to get
122 	/// a new worker thread. This will be copied to a worker thread
123 	/// when a thread becomes free and then this cache is marked as
124 	/// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
125 	/// the filter options from filters[] would get uselessly copied
126 	/// multiple times (allocated and freed) when waiting for a new free
127 	/// worker thread.
128 	///
129 	/// This is freed if filters[] is updated via lzma_filters_update().
130 	lzma_filter filters_cache[LZMA_FILTERS_MAX + 1];
131 
132 
133 	/// Index to hold sizes of the Blocks
134 	lzma_index *index;
135 
136 	/// Index encoder
137 	lzma_next_coder index_encoder;
138 
139 
140 	/// Stream Flags for encoding the Stream Header and Stream Footer.
141 	lzma_stream_flags stream_flags;
142 
143 	/// Buffer to hold Stream Header and Stream Footer.
144 	uint8_t header[LZMA_STREAM_HEADER_SIZE];
145 
146 	/// Read position in header[]
147 	size_t header_pos;
148 
149 
150 	/// Output buffer queue for compressed data
151 	lzma_outq outq;
152 
153 	/// How much memory to allocate for each lzma_outbuf.buf
154 	size_t outbuf_alloc_size;
155 
156 
157 	/// Maximum wait time if cannot use all the input and cannot
158 	/// fill the output buffer. This is in milliseconds.
159 	uint32_t timeout;
160 
161 
162 	/// Error code from a worker thread
163 	lzma_ret thread_error;
164 
165 	/// Array of allocated thread-specific structures
166 	worker_thread *threads;
167 
168 	/// Number of structures in "threads" above. This is also the
169 	/// number of threads that will be created at maximum.
170 	uint32_t threads_max;
171 
172 	/// Number of thread structures that have been initialized, and
173 	/// thus the number of worker threads actually created so far.
174 	uint32_t threads_initialized;
175 
176 	/// Stack of free threads. When a thread finishes, it puts itself
177 	/// back into this stack. This starts as empty because threads
178 	/// are created only when actually needed.
179 	worker_thread *threads_free;
180 
181 	/// The most recent worker thread to which the main thread writes
182 	/// the new input from the application.
183 	worker_thread *thr;
184 
185 
186 	/// Amount of uncompressed data in Blocks that have already
187 	/// been finished.
188 	uint64_t progress_in;
189 
190 	/// Amount of compressed data in Stream Header + Blocks that
191 	/// have already been finished.
192 	uint64_t progress_out;
193 
194 
195 	mythread_mutex mutex;
196 	mythread_cond cond;
197 };
198 
199 
200 /// Tell the main thread that something has gone wrong.
201 static void
worker_error(worker_thread * thr,lzma_ret ret)202 worker_error(worker_thread *thr, lzma_ret ret)
203 {
204 	assert(ret != LZMA_OK);
205 	assert(ret != LZMA_STREAM_END);
206 
207 	mythread_sync(thr->coder->mutex) {
208 		if (thr->coder->thread_error == LZMA_OK)
209 			thr->coder->thread_error = ret;
210 
211 		mythread_cond_signal(&thr->coder->cond);
212 	}
213 
214 	return;
215 }
216 
217 
218 static worker_state
worker_encode(worker_thread * thr,size_t * out_pos,worker_state state)219 worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
220 {
221 	assert(thr->progress_in == 0);
222 	assert(thr->progress_out == 0);
223 
224 	// Set the Block options.
225 	thr->block_options = (lzma_block){
226 		.version = 0,
227 		.check = thr->coder->stream_flags.check,
228 		.compressed_size = thr->outbuf->allocated,
229 		.uncompressed_size = thr->coder->block_size,
230 		.filters = thr->filters,
231 	};
232 
233 	// Calculate maximum size of the Block Header. This amount is
234 	// reserved in the beginning of the buffer so that Block Header
235 	// along with Compressed Size and Uncompressed Size can be
236 	// written there.
237 	lzma_ret ret = lzma_block_header_size(&thr->block_options);
238 	if (ret != LZMA_OK) {
239 		worker_error(thr, ret);
240 		return THR_STOP;
241 	}
242 
243 	// Initialize the Block encoder.
244 	ret = lzma_block_encoder_init(&thr->block_encoder,
245 			thr->allocator, &thr->block_options);
246 	if (ret != LZMA_OK) {
247 		worker_error(thr, ret);
248 		return THR_STOP;
249 	}
250 
251 	size_t in_pos = 0;
252 	size_t in_size = 0;
253 
254 	*out_pos = thr->block_options.header_size;
255 	const size_t out_size = thr->outbuf->allocated;
256 
257 	do {
258 		mythread_sync(thr->mutex) {
259 			// Store in_pos and *out_pos into *thr so that
260 			// an application may read them via
261 			// lzma_get_progress() to get progress information.
262 			//
263 			// NOTE: These aren't updated when the encoding
264 			// finishes. Instead, the final values are taken
265 			// later from thr->outbuf.
266 			thr->progress_in = in_pos;
267 			thr->progress_out = *out_pos;
268 
269 			while (in_size == thr->in_size
270 					&& thr->state == THR_RUN)
271 				mythread_cond_wait(&thr->cond, &thr->mutex);
272 
273 			state = thr->state;
274 			in_size = thr->in_size;
275 		}
276 
277 		// Return if we were asked to stop or exit.
278 		if (state >= THR_STOP)
279 			return state;
280 
281 		lzma_action action = state == THR_FINISH
282 				? LZMA_FINISH : LZMA_RUN;
283 
284 		// Limit the amount of input given to the Block encoder
285 		// at once. This way this thread can react fairly quickly
286 		// if the main thread wants us to stop or exit.
287 		static const size_t in_chunk_max = 16384;
288 		size_t in_limit = in_size;
289 		if (in_size - in_pos > in_chunk_max) {
290 			in_limit = in_pos + in_chunk_max;
291 			action = LZMA_RUN;
292 		}
293 
294 		ret = thr->block_encoder.code(
295 				thr->block_encoder.coder, thr->allocator,
296 				thr->in, &in_pos, in_limit, thr->outbuf->buf,
297 				out_pos, out_size, action);
298 	} while (ret == LZMA_OK && *out_pos < out_size);
299 
300 	switch (ret) {
301 	case LZMA_STREAM_END:
302 		assert(state == THR_FINISH);
303 
304 		// Encode the Block Header. By doing it after
305 		// the compression, we can store the Compressed Size
306 		// and Uncompressed Size fields.
307 		ret = lzma_block_header_encode(&thr->block_options,
308 				thr->outbuf->buf);
309 		if (ret != LZMA_OK) {
310 			worker_error(thr, ret);
311 			return THR_STOP;
312 		}
313 
314 		break;
315 
316 	case LZMA_OK:
317 		// The data was incompressible. Encode it using uncompressed
318 		// LZMA2 chunks.
319 		//
320 		// First wait that we have gotten all the input.
321 		mythread_sync(thr->mutex) {
322 			while (thr->state == THR_RUN)
323 				mythread_cond_wait(&thr->cond, &thr->mutex);
324 
325 			state = thr->state;
326 			in_size = thr->in_size;
327 		}
328 
329 		if (state >= THR_STOP)
330 			return state;
331 
332 		// Do the encoding. This takes care of the Block Header too.
333 		*out_pos = 0;
334 		ret = lzma_block_uncomp_encode(&thr->block_options,
335 				thr->in, in_size, thr->outbuf->buf,
336 				out_pos, out_size);
337 
338 		// It shouldn't fail.
339 		if (ret != LZMA_OK) {
340 			worker_error(thr, LZMA_PROG_ERROR);
341 			return THR_STOP;
342 		}
343 
344 		break;
345 
346 	default:
347 		worker_error(thr, ret);
348 		return THR_STOP;
349 	}
350 
351 	// Set the size information that will be read by the main thread
352 	// to write the Index field.
353 	thr->outbuf->unpadded_size
354 			= lzma_block_unpadded_size(&thr->block_options);
355 	assert(thr->outbuf->unpadded_size != 0);
356 	thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
357 
358 	return THR_FINISH;
359 }
360 
361 
362 static MYTHREAD_RET_TYPE
worker_start(void * thr_ptr)363 worker_start(void *thr_ptr)
364 {
365 	worker_thread *thr = thr_ptr;
366 	worker_state state = THR_IDLE; // Init to silence a warning
367 
368 	while (true) {
369 		// Wait for work.
370 		mythread_sync(thr->mutex) {
371 			while (true) {
372 				// The thread is already idle so if we are
373 				// requested to stop, just set the state.
374 				if (thr->state == THR_STOP) {
375 					thr->state = THR_IDLE;
376 					mythread_cond_signal(&thr->cond);
377 				}
378 
379 				state = thr->state;
380 				if (state != THR_IDLE)
381 					break;
382 
383 				mythread_cond_wait(&thr->cond, &thr->mutex);
384 			}
385 		}
386 
387 		size_t out_pos = 0;
388 
389 		assert(state != THR_IDLE);
390 		assert(state != THR_STOP);
391 
392 		if (state <= THR_FINISH)
393 			state = worker_encode(thr, &out_pos, state);
394 
395 		if (state == THR_EXIT)
396 			break;
397 
398 		// Mark the thread as idle unless the main thread has
399 		// told us to exit. Signal is needed for the case
400 		// where the main thread is waiting for the threads to stop.
401 		mythread_sync(thr->mutex) {
402 			if (thr->state != THR_EXIT) {
403 				thr->state = THR_IDLE;
404 				mythread_cond_signal(&thr->cond);
405 			}
406 		}
407 
408 		mythread_sync(thr->coder->mutex) {
409 			// If no errors occurred, make the encoded data
410 			// available to be copied out.
411 			if (state == THR_FINISH) {
412 				thr->outbuf->pos = out_pos;
413 				thr->outbuf->finished = true;
414 			}
415 
416 			// Update the main progress info.
417 			thr->coder->progress_in
418 					+= thr->outbuf->uncompressed_size;
419 			thr->coder->progress_out += out_pos;
420 			thr->progress_in = 0;
421 			thr->progress_out = 0;
422 
423 			// Return this thread to the stack of free threads.
424 			thr->next = thr->coder->threads_free;
425 			thr->coder->threads_free = thr;
426 
427 			mythread_cond_signal(&thr->coder->cond);
428 		}
429 	}
430 
431 	// Exiting, free the resources.
432 	lzma_filters_free(thr->filters, thr->allocator);
433 
434 	mythread_mutex_destroy(&thr->mutex);
435 	mythread_cond_destroy(&thr->cond);
436 
437 	lzma_next_end(&thr->block_encoder, thr->allocator);
438 	lzma_free(thr->in, thr->allocator);
439 	return MYTHREAD_RET_VALUE;
440 }
441 
442 
443 /// Make the threads stop but not exit. Optionally wait for them to stop.
444 static void
threads_stop(lzma_stream_coder * coder,bool wait_for_threads)445 threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
446 {
447 	// Tell the threads to stop.
448 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
449 		mythread_sync(coder->threads[i].mutex) {
450 			coder->threads[i].state = THR_STOP;
451 			mythread_cond_signal(&coder->threads[i].cond);
452 		}
453 	}
454 
455 	if (!wait_for_threads)
456 		return;
457 
458 	// Wait for the threads to settle in the idle state.
459 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
460 		mythread_sync(coder->threads[i].mutex) {
461 			while (coder->threads[i].state != THR_IDLE)
462 				mythread_cond_wait(&coder->threads[i].cond,
463 						&coder->threads[i].mutex);
464 		}
465 	}
466 
467 	return;
468 }
469 
470 
471 /// Stop the threads and free the resources associated with them.
472 /// Wait until the threads have exited.
473 static void
threads_end(lzma_stream_coder * coder,const lzma_allocator * allocator)474 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
475 {
476 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
477 		mythread_sync(coder->threads[i].mutex) {
478 			coder->threads[i].state = THR_EXIT;
479 			mythread_cond_signal(&coder->threads[i].cond);
480 		}
481 	}
482 
483 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
484 		int ret = mythread_join(coder->threads[i].thread_id);
485 		assert(ret == 0);
486 		(void)ret;
487 	}
488 
489 	lzma_free(coder->threads, allocator);
490 	return;
491 }
492 
493 
494 /// Initialize a new worker_thread structure and create a new thread.
495 static lzma_ret
initialize_new_thread(lzma_stream_coder * coder,const lzma_allocator * allocator)496 initialize_new_thread(lzma_stream_coder *coder,
497 		const lzma_allocator *allocator)
498 {
499 	worker_thread *thr = &coder->threads[coder->threads_initialized];
500 
501 	thr->in = lzma_alloc(coder->block_size, allocator);
502 	if (thr->in == NULL)
503 		return LZMA_MEM_ERROR;
504 
505 	if (mythread_mutex_init(&thr->mutex))
506 		goto error_mutex;
507 
508 	if (mythread_cond_init(&thr->cond))
509 		goto error_cond;
510 
511 	thr->state = THR_IDLE;
512 	thr->allocator = allocator;
513 	thr->coder = coder;
514 	thr->progress_in = 0;
515 	thr->progress_out = 0;
516 	thr->block_encoder = LZMA_NEXT_CODER_INIT;
517 	thr->filters[0].id = LZMA_VLI_UNKNOWN;
518 
519 	if (mythread_create(&thr->thread_id, &worker_start, thr))
520 		goto error_thread;
521 
522 	++coder->threads_initialized;
523 	coder->thr = thr;
524 
525 	return LZMA_OK;
526 
527 error_thread:
528 	mythread_cond_destroy(&thr->cond);
529 
530 error_cond:
531 	mythread_mutex_destroy(&thr->mutex);
532 
533 error_mutex:
534 	lzma_free(thr->in, allocator);
535 	return LZMA_MEM_ERROR;
536 }
537 
538 
539 static lzma_ret
get_thread(lzma_stream_coder * coder,const lzma_allocator * allocator)540 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
541 {
542 	// If there are no free output subqueues, there is no
543 	// point to try getting a thread.
544 	if (!lzma_outq_has_buf(&coder->outq))
545 		return LZMA_OK;
546 
547 	// That's also true if we cannot allocate memory for the output
548 	// buffer in the output queue.
549 	return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
550 			coder->outbuf_alloc_size));
551 
552 	// Make a thread-specific copy of the filter chain. Put it in
553 	// the cache array first so that if we cannot get a new thread yet,
554 	// the allocation is ready when we try again.
555 	if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN)
556 		return_if_error(lzma_filters_copy(
557 			coder->filters, coder->filters_cache, allocator));
558 
559 	// If there is a free structure on the stack, use it.
560 	mythread_sync(coder->mutex) {
561 		if (coder->threads_free != NULL) {
562 			coder->thr = coder->threads_free;
563 			coder->threads_free = coder->threads_free->next;
564 		}
565 	}
566 
567 	if (coder->thr == NULL) {
568 		// If there are no uninitialized structures left, return.
569 		if (coder->threads_initialized == coder->threads_max)
570 			return LZMA_OK;
571 
572 		// Initialize a new thread.
573 		return_if_error(initialize_new_thread(coder, allocator));
574 	}
575 
576 	// Reset the parts of the thread state that have to be done
577 	// in the main thread.
578 	mythread_sync(coder->thr->mutex) {
579 		coder->thr->state = THR_RUN;
580 		coder->thr->in_size = 0;
581 		coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
582 
583 		// Free the old thread-specific filter options and replace
584 		// them with the already-allocated new options from
585 		// coder->filters_cache[]. Then mark the cache as empty.
586 		lzma_filters_free(coder->thr->filters, allocator);
587 		memcpy(coder->thr->filters, coder->filters_cache,
588 				sizeof(coder->filters_cache));
589 		coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
590 
591 		mythread_cond_signal(&coder->thr->cond);
592 	}
593 
594 	return LZMA_OK;
595 }
596 
597 
598 static lzma_ret
stream_encode_in(lzma_stream_coder * coder,const lzma_allocator * allocator,const uint8_t * restrict in,size_t * restrict in_pos,size_t in_size,lzma_action action)599 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
600 		const uint8_t *restrict in, size_t *restrict in_pos,
601 		size_t in_size, lzma_action action)
602 {
603 	while (*in_pos < in_size
604 			|| (coder->thr != NULL && action != LZMA_RUN)) {
605 		if (coder->thr == NULL) {
606 			// Get a new thread.
607 			const lzma_ret ret = get_thread(coder, allocator);
608 			if (coder->thr == NULL)
609 				return ret;
610 		}
611 
612 		// Copy the input data to thread's buffer.
613 		size_t thr_in_size = coder->thr->in_size;
614 		lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
615 				&thr_in_size, coder->block_size);
616 
617 		// Tell the Block encoder to finish if
618 		//  - it has got block_size bytes of input; or
619 		//  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
620 		//    or LZMA_FULL_BARRIER was used.
621 		//
622 		// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
623 		const bool finish = thr_in_size == coder->block_size
624 				|| (*in_pos == in_size && action != LZMA_RUN);
625 
626 		bool block_error = false;
627 
628 		mythread_sync(coder->thr->mutex) {
629 			if (coder->thr->state == THR_IDLE) {
630 				// Something has gone wrong with the Block
631 				// encoder. It has set coder->thread_error
632 				// which we will read a few lines later.
633 				block_error = true;
634 			} else {
635 				// Tell the Block encoder its new amount
636 				// of input and update the state if needed.
637 				coder->thr->in_size = thr_in_size;
638 
639 				if (finish)
640 					coder->thr->state = THR_FINISH;
641 
642 				mythread_cond_signal(&coder->thr->cond);
643 			}
644 		}
645 
646 		if (block_error) {
647 			lzma_ret ret = LZMA_OK; // Init to silence a warning.
648 
649 			mythread_sync(coder->mutex) {
650 				ret = coder->thread_error;
651 			}
652 
653 			return ret;
654 		}
655 
656 		if (finish)
657 			coder->thr = NULL;
658 	}
659 
660 	return LZMA_OK;
661 }
662 
663 
664 /// Wait until more input can be consumed, more output can be read, or
665 /// an optional timeout is reached.
666 static bool
wait_for_work(lzma_stream_coder * coder,mythread_condtime * wait_abs,bool * has_blocked,bool has_input)667 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
668 		bool *has_blocked, bool has_input)
669 {
670 	if (coder->timeout != 0 && !*has_blocked) {
671 		// Every time when stream_encode_mt() is called via
672 		// lzma_code(), *has_blocked starts as false. We set it
673 		// to true here and calculate the absolute time when
674 		// we must return if there's nothing to do.
675 		//
676 		// This way if we block multiple times for short moments
677 		// less than "timeout" milliseconds, we will return once
678 		// "timeout" amount of time has passed since the *first*
679 		// blocking occurred. If the absolute time was calculated
680 		// again every time we block, "timeout" would effectively
681 		// be meaningless if we never consecutively block longer
682 		// than "timeout" ms.
683 		*has_blocked = true;
684 		mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
685 	}
686 
687 	bool timed_out = false;
688 
689 	mythread_sync(coder->mutex) {
690 		// There are four things that we wait. If one of them
691 		// becomes possible, we return.
692 		//  - If there is input left, we need to get a free
693 		//    worker thread and an output buffer for it.
694 		//  - Data ready to be read from the output queue.
695 		//  - A worker thread indicates an error.
696 		//  - Time out occurs.
697 		while ((!has_input || coder->threads_free == NULL
698 					|| !lzma_outq_has_buf(&coder->outq))
699 				&& !lzma_outq_is_readable(&coder->outq)
700 				&& coder->thread_error == LZMA_OK
701 				&& !timed_out) {
702 			if (coder->timeout != 0)
703 				timed_out = mythread_cond_timedwait(
704 						&coder->cond, &coder->mutex,
705 						wait_abs) != 0;
706 			else
707 				mythread_cond_wait(&coder->cond,
708 						&coder->mutex);
709 		}
710 	}
711 
712 	return timed_out;
713 }
714 
715 
716 static lzma_ret
stream_encode_mt(void * coder_ptr,const lzma_allocator * allocator,const uint8_t * restrict in,size_t * restrict in_pos,size_t in_size,uint8_t * restrict out,size_t * restrict out_pos,size_t out_size,lzma_action action)717 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
718 		const uint8_t *restrict in, size_t *restrict in_pos,
719 		size_t in_size, uint8_t *restrict out,
720 		size_t *restrict out_pos, size_t out_size, lzma_action action)
721 {
722 	lzma_stream_coder *coder = coder_ptr;
723 
724 	switch (coder->sequence) {
725 	case SEQ_STREAM_HEADER:
726 		lzma_bufcpy(coder->header, &coder->header_pos,
727 				sizeof(coder->header),
728 				out, out_pos, out_size);
729 		if (coder->header_pos < sizeof(coder->header))
730 			return LZMA_OK;
731 
732 		coder->header_pos = 0;
733 		coder->sequence = SEQ_BLOCK;
734 
735 	// Fall through
736 
737 	case SEQ_BLOCK: {
738 		// Initialized to silence warnings.
739 		lzma_vli unpadded_size = 0;
740 		lzma_vli uncompressed_size = 0;
741 		lzma_ret ret = LZMA_OK;
742 
743 		// These are for wait_for_work().
744 		bool has_blocked = false;
745 		mythread_condtime wait_abs = { 0 };
746 
747 		while (true) {
748 			mythread_sync(coder->mutex) {
749 				// Check for Block encoder errors.
750 				ret = coder->thread_error;
751 				if (ret != LZMA_OK) {
752 					assert(ret != LZMA_STREAM_END);
753 					break; // Break out of mythread_sync.
754 				}
755 
756 				// Try to read compressed data to out[].
757 				ret = lzma_outq_read(&coder->outq, allocator,
758 						out, out_pos, out_size,
759 						&unpadded_size,
760 						&uncompressed_size);
761 			}
762 
763 			if (ret == LZMA_STREAM_END) {
764 				// End of Block. Add it to the Index.
765 				ret = lzma_index_append(coder->index,
766 						allocator, unpadded_size,
767 						uncompressed_size);
768 				if (ret != LZMA_OK) {
769 					threads_stop(coder, false);
770 					return ret;
771 				}
772 
773 				// If we didn't fill the output buffer yet,
774 				// try to read more data. Maybe the next
775 				// outbuf has been finished already too.
776 				if (*out_pos < out_size)
777 					continue;
778 			}
779 
780 			if (ret != LZMA_OK) {
781 				// coder->thread_error was set.
782 				threads_stop(coder, false);
783 				return ret;
784 			}
785 
786 			// Try to give uncompressed data to a worker thread.
787 			ret = stream_encode_in(coder, allocator,
788 					in, in_pos, in_size, action);
789 			if (ret != LZMA_OK) {
790 				threads_stop(coder, false);
791 				return ret;
792 			}
793 
794 			// See if we should wait or return.
795 			//
796 			// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
797 			if (*in_pos == in_size) {
798 				// LZMA_RUN: More data is probably coming
799 				// so return to let the caller fill the
800 				// input buffer.
801 				if (action == LZMA_RUN)
802 					return LZMA_OK;
803 
804 				// LZMA_FULL_BARRIER: The same as with
805 				// LZMA_RUN but tell the caller that the
806 				// barrier was completed.
807 				if (action == LZMA_FULL_BARRIER)
808 					return LZMA_STREAM_END;
809 
810 				// Finishing or flushing isn't completed until
811 				// all input data has been encoded and copied
812 				// to the output buffer.
813 				if (lzma_outq_is_empty(&coder->outq)) {
814 					// LZMA_FINISH: Continue to encode
815 					// the Index field.
816 					if (action == LZMA_FINISH)
817 						break;
818 
819 					// LZMA_FULL_FLUSH: Return to tell
820 					// the caller that flushing was
821 					// completed.
822 					if (action == LZMA_FULL_FLUSH)
823 						return LZMA_STREAM_END;
824 				}
825 			}
826 
827 			// Return if there is no output space left.
828 			// This check must be done after testing the input
829 			// buffer, because we might want to use a different
830 			// return code.
831 			if (*out_pos == out_size)
832 				return LZMA_OK;
833 
834 			// Neither in nor out has been used completely.
835 			// Wait until there's something we can do.
836 			if (wait_for_work(coder, &wait_abs, &has_blocked,
837 					*in_pos < in_size))
838 				return LZMA_TIMED_OUT;
839 		}
840 
841 		// All Blocks have been encoded and the threads have stopped.
842 		// Prepare to encode the Index field.
843 		return_if_error(lzma_index_encoder_init(
844 				&coder->index_encoder, allocator,
845 				coder->index));
846 		coder->sequence = SEQ_INDEX;
847 
848 		// Update the progress info to take the Index and
849 		// Stream Footer into account. Those are very fast to encode
850 		// so in terms of progress information they can be thought
851 		// to be ready to be copied out.
852 		coder->progress_out += lzma_index_size(coder->index)
853 				+ LZMA_STREAM_HEADER_SIZE;
854 	}
855 
856 	// Fall through
857 
858 	case SEQ_INDEX: {
859 		// Call the Index encoder. It doesn't take any input, so
860 		// those pointers can be NULL.
861 		const lzma_ret ret = coder->index_encoder.code(
862 				coder->index_encoder.coder, allocator,
863 				NULL, NULL, 0,
864 				out, out_pos, out_size, LZMA_RUN);
865 		if (ret != LZMA_STREAM_END)
866 			return ret;
867 
868 		// Encode the Stream Footer into coder->buffer.
869 		coder->stream_flags.backward_size
870 				= lzma_index_size(coder->index);
871 		if (lzma_stream_footer_encode(&coder->stream_flags,
872 				coder->header) != LZMA_OK)
873 			return LZMA_PROG_ERROR;
874 
875 		coder->sequence = SEQ_STREAM_FOOTER;
876 	}
877 
878 	// Fall through
879 
880 	case SEQ_STREAM_FOOTER:
881 		lzma_bufcpy(coder->header, &coder->header_pos,
882 				sizeof(coder->header),
883 				out, out_pos, out_size);
884 		return coder->header_pos < sizeof(coder->header)
885 				? LZMA_OK : LZMA_STREAM_END;
886 	}
887 
888 	assert(0);
889 	return LZMA_PROG_ERROR;
890 }
891 
892 
893 static void
stream_encoder_mt_end(void * coder_ptr,const lzma_allocator * allocator)894 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
895 {
896 	lzma_stream_coder *coder = coder_ptr;
897 
898 	// Threads must be killed before the output queue can be freed.
899 	threads_end(coder, allocator);
900 	lzma_outq_end(&coder->outq, allocator);
901 
902 	lzma_filters_free(coder->filters, allocator);
903 	lzma_filters_free(coder->filters_cache, allocator);
904 
905 	lzma_next_end(&coder->index_encoder, allocator);
906 	lzma_index_end(coder->index, allocator);
907 
908 	mythread_cond_destroy(&coder->cond);
909 	mythread_mutex_destroy(&coder->mutex);
910 
911 	lzma_free(coder, allocator);
912 	return;
913 }
914 
915 
916 static lzma_ret
stream_encoder_mt_update(void * coder_ptr,const lzma_allocator * allocator,const lzma_filter * filters,const lzma_filter * reversed_filters lzma_attribute ((__unused__)))917 stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator,
918 		const lzma_filter *filters,
919 		const lzma_filter *reversed_filters
920 			lzma_attribute((__unused__)))
921 {
922 	lzma_stream_coder *coder = coder_ptr;
923 
924 	// Applications shouldn't attempt to change the options when
925 	// we are already encoding the Index or Stream Footer.
926 	if (coder->sequence > SEQ_BLOCK)
927 		return LZMA_PROG_ERROR;
928 
929 	// For now the threaded encoder doesn't support changing
930 	// the options in the middle of a Block.
931 	if (coder->thr != NULL)
932 		return LZMA_PROG_ERROR;
933 
934 	// Check if the filter chain seems mostly valid. See the comment
935 	// in stream_encoder_mt_init().
936 	if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
937 		return LZMA_OPTIONS_ERROR;
938 
939 	// Make a copy to a temporary buffer first. This way the encoder
940 	// state stays unchanged if an error occurs in lzma_filters_copy().
941 	lzma_filter temp[LZMA_FILTERS_MAX + 1];
942 	return_if_error(lzma_filters_copy(filters, temp, allocator));
943 
944 	// Free the options of the old chain as well as the cache.
945 	lzma_filters_free(coder->filters, allocator);
946 	lzma_filters_free(coder->filters_cache, allocator);
947 
948 	// Copy the new filter chain in place.
949 	memcpy(coder->filters, temp, sizeof(temp));
950 
951 	return LZMA_OK;
952 }
953 
954 
955 /// Options handling for lzma_stream_encoder_mt_init() and
956 /// lzma_stream_encoder_mt_memusage()
957 static lzma_ret
get_options(const lzma_mt * options,lzma_options_easy * opt_easy,const lzma_filter ** filters,uint64_t * block_size,uint64_t * outbuf_size_max)958 get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
959 		const lzma_filter **filters, uint64_t *block_size,
960 		uint64_t *outbuf_size_max)
961 {
962 	// Validate some of the options.
963 	if (options == NULL)
964 		return LZMA_PROG_ERROR;
965 
966 	if (options->flags != 0 || options->threads == 0
967 			|| options->threads > LZMA_THREADS_MAX)
968 		return LZMA_OPTIONS_ERROR;
969 
970 	if (options->filters != NULL) {
971 		// Filter chain was given, use it as is.
972 		*filters = options->filters;
973 	} else {
974 		// Use a preset.
975 		if (lzma_easy_preset(opt_easy, options->preset))
976 			return LZMA_OPTIONS_ERROR;
977 
978 		*filters = opt_easy->filters;
979 	}
980 
981 	// If the Block size is not set, determine it from the filter chain.
982 	if (options->block_size > 0)
983 		*block_size = options->block_size;
984 	else
985 		*block_size = lzma_mt_block_size(*filters);
986 
987 	// UINT64_MAX > BLOCK_SIZE_MAX, so the second condition
988 	// should be optimized out by any reasonable compiler.
989 	// The second condition should be there in the unlikely event that
990 	// the macros change and UINT64_MAX < BLOCK_SIZE_MAX.
991 	if (*block_size > BLOCK_SIZE_MAX || *block_size == UINT64_MAX)
992 		return LZMA_OPTIONS_ERROR;
993 
994 	// Calculate the maximum amount output that a single output buffer
995 	// may need to hold. This is the same as the maximum total size of
996 	// a Block.
997 	*outbuf_size_max = lzma_block_buffer_bound64(*block_size);
998 	if (*outbuf_size_max == 0)
999 		return LZMA_MEM_ERROR;
1000 
1001 	return LZMA_OK;
1002 }
1003 
1004 
1005 static void
get_progress(void * coder_ptr,uint64_t * progress_in,uint64_t * progress_out)1006 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
1007 {
1008 	lzma_stream_coder *coder = coder_ptr;
1009 
1010 	// Lock coder->mutex to prevent finishing threads from moving their
1011 	// progress info from the worker_thread structure to lzma_stream_coder.
1012 	mythread_sync(coder->mutex) {
1013 		*progress_in = coder->progress_in;
1014 		*progress_out = coder->progress_out;
1015 
1016 		for (size_t i = 0; i < coder->threads_initialized; ++i) {
1017 			mythread_sync(coder->threads[i].mutex) {
1018 				*progress_in += coder->threads[i].progress_in;
1019 				*progress_out += coder->threads[i]
1020 						.progress_out;
1021 			}
1022 		}
1023 	}
1024 
1025 	return;
1026 }
1027 
1028 
1029 static lzma_ret
stream_encoder_mt_init(lzma_next_coder * next,const lzma_allocator * allocator,const lzma_mt * options)1030 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
1031 		const lzma_mt *options)
1032 {
1033 	lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
1034 
1035 	// Get the filter chain.
1036 	lzma_options_easy easy;
1037 	const lzma_filter *filters;
1038 	uint64_t block_size;
1039 	uint64_t outbuf_size_max;
1040 	return_if_error(get_options(options, &easy, &filters,
1041 			&block_size, &outbuf_size_max));
1042 
1043 #if SIZE_MAX < UINT64_MAX
1044 	if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
1045 		return LZMA_MEM_ERROR;
1046 #endif
1047 
1048 	// Validate the filter chain so that we can give an error in this
1049 	// function instead of delaying it to the first call to lzma_code().
1050 	// The memory usage calculation verifies the filter chain as
1051 	// a side effect so we take advantage of that. It's not a perfect
1052 	// check though as raw encoder allows LZMA1 too but such problems
1053 	// will be caught eventually with Block Header encoder.
1054 	if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
1055 		return LZMA_OPTIONS_ERROR;
1056 
1057 	// Validate the Check ID.
1058 	if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
1059 		return LZMA_PROG_ERROR;
1060 
1061 	if (!lzma_check_is_supported(options->check))
1062 		return LZMA_UNSUPPORTED_CHECK;
1063 
1064 	// Allocate and initialize the base structure if needed.
1065 	lzma_stream_coder *coder = next->coder;
1066 	if (coder == NULL) {
1067 		coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
1068 		if (coder == NULL)
1069 			return LZMA_MEM_ERROR;
1070 
1071 		next->coder = coder;
1072 
1073 		// For the mutex and condition variable initializations
1074 		// the error handling has to be done here because
1075 		// stream_encoder_mt_end() doesn't know if they have
1076 		// already been initialized or not.
1077 		if (mythread_mutex_init(&coder->mutex)) {
1078 			lzma_free(coder, allocator);
1079 			next->coder = NULL;
1080 			return LZMA_MEM_ERROR;
1081 		}
1082 
1083 		if (mythread_cond_init(&coder->cond)) {
1084 			mythread_mutex_destroy(&coder->mutex);
1085 			lzma_free(coder, allocator);
1086 			next->coder = NULL;
1087 			return LZMA_MEM_ERROR;
1088 		}
1089 
1090 		next->code = &stream_encode_mt;
1091 		next->end = &stream_encoder_mt_end;
1092 		next->get_progress = &get_progress;
1093 		next->update = &stream_encoder_mt_update;
1094 
1095 		coder->filters[0].id = LZMA_VLI_UNKNOWN;
1096 		coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
1097 		coder->index_encoder = LZMA_NEXT_CODER_INIT;
1098 		coder->index = NULL;
1099 		memzero(&coder->outq, sizeof(coder->outq));
1100 		coder->threads = NULL;
1101 		coder->threads_max = 0;
1102 		coder->threads_initialized = 0;
1103 	}
1104 
1105 	// Basic initializations
1106 	coder->sequence = SEQ_STREAM_HEADER;
1107 	coder->block_size = (size_t)(block_size);
1108 	coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
1109 	coder->thread_error = LZMA_OK;
1110 	coder->thr = NULL;
1111 
1112 	// Allocate the thread-specific base structures.
1113 	assert(options->threads > 0);
1114 	if (coder->threads_max != options->threads) {
1115 		threads_end(coder, allocator);
1116 
1117 		coder->threads = NULL;
1118 		coder->threads_max = 0;
1119 
1120 		coder->threads_initialized = 0;
1121 		coder->threads_free = NULL;
1122 
1123 		coder->threads = lzma_alloc(
1124 				options->threads * sizeof(worker_thread),
1125 				allocator);
1126 		if (coder->threads == NULL)
1127 			return LZMA_MEM_ERROR;
1128 
1129 		coder->threads_max = options->threads;
1130 	} else {
1131 		// Reuse the old structures and threads. Tell the running
1132 		// threads to stop and wait until they have stopped.
1133 		threads_stop(coder, true);
1134 	}
1135 
1136 	// Output queue
1137 	return_if_error(lzma_outq_init(&coder->outq, allocator,
1138 			options->threads));
1139 
1140 	// Timeout
1141 	coder->timeout = options->timeout;
1142 
1143 	// Free the old filter chain and the cache.
1144 	lzma_filters_free(coder->filters, allocator);
1145 	lzma_filters_free(coder->filters_cache, allocator);
1146 
1147 	// Copy the new filter chain.
1148 	return_if_error(lzma_filters_copy(
1149 			filters, coder->filters, allocator));
1150 
1151 	// Index
1152 	lzma_index_end(coder->index, allocator);
1153 	coder->index = lzma_index_init(allocator);
1154 	if (coder->index == NULL)
1155 		return LZMA_MEM_ERROR;
1156 
1157 	// Stream Header
1158 	coder->stream_flags.version = 0;
1159 	coder->stream_flags.check = options->check;
1160 	return_if_error(lzma_stream_header_encode(
1161 			&coder->stream_flags, coder->header));
1162 
1163 	coder->header_pos = 0;
1164 
1165 	// Progress info
1166 	coder->progress_in = 0;
1167 	coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1168 
1169 	return LZMA_OK;
1170 }
1171 
1172 
1173 #ifdef HAVE_SYMBOL_VERSIONS_LINUX
1174 // These are for compatibility with binaries linked against liblzma that
1175 // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7.
1176 // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2
1177 // but it has been added here anyway since someone might misread the
1178 // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist.
1179 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha",
1180 	lzma_ret, lzma_stream_encoder_mt_512a)(
1181 		lzma_stream *strm, const lzma_mt *options)
1182 		lzma_nothrow lzma_attr_warn_unused_result
1183 		__attribute__((__alias__("lzma_stream_encoder_mt_52")));
1184 
1185 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2",
1186 	lzma_ret, lzma_stream_encoder_mt_522)(
1187 		lzma_stream *strm, const lzma_mt *options)
1188 		lzma_nothrow lzma_attr_warn_unused_result
1189 		__attribute__((__alias__("lzma_stream_encoder_mt_52")));
1190 
1191 LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2",
1192 	lzma_ret, lzma_stream_encoder_mt_52)(
1193 		lzma_stream *strm, const lzma_mt *options)
1194 		lzma_nothrow lzma_attr_warn_unused_result;
1195 
1196 #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52
1197 #endif
1198 extern LZMA_API(lzma_ret)
lzma_stream_encoder_mt(lzma_stream * strm,const lzma_mt * options)1199 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1200 {
1201 	lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1202 
1203 	strm->internal->supported_actions[LZMA_RUN] = true;
1204 // 	strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1205 	strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1206 	strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1207 	strm->internal->supported_actions[LZMA_FINISH] = true;
1208 
1209 	return LZMA_OK;
1210 }
1211 
1212 
1213 #ifdef HAVE_SYMBOL_VERSIONS_LINUX
1214 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha",
1215 	uint64_t, lzma_stream_encoder_mt_memusage_512a)(
1216 	const lzma_mt *options) lzma_nothrow lzma_attr_pure
1217 	__attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1218 
1219 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2",
1220 	uint64_t, lzma_stream_encoder_mt_memusage_522)(
1221 	const lzma_mt *options) lzma_nothrow lzma_attr_pure
1222 	__attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1223 
1224 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2",
1225 	uint64_t, lzma_stream_encoder_mt_memusage_52)(
1226 	const lzma_mt *options) lzma_nothrow lzma_attr_pure;
1227 
1228 #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52
1229 #endif
1230 // This function name is a monster but it's consistent with the older
1231 // monster names. :-( 31 chars is the max that C99 requires so in that
1232 // sense it's not too long. ;-)
1233 extern LZMA_API(uint64_t)
lzma_stream_encoder_mt_memusage(const lzma_mt * options)1234 lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1235 {
1236 	lzma_options_easy easy;
1237 	const lzma_filter *filters;
1238 	uint64_t block_size;
1239 	uint64_t outbuf_size_max;
1240 
1241 	if (get_options(options, &easy, &filters, &block_size,
1242 			&outbuf_size_max) != LZMA_OK)
1243 		return UINT64_MAX;
1244 
1245 	// Memory usage of the input buffers
1246 	const uint64_t inbuf_memusage = options->threads * block_size;
1247 
1248 	// Memory usage of the filter encoders
1249 	uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1250 	if (filters_memusage == UINT64_MAX)
1251 		return UINT64_MAX;
1252 
1253 	filters_memusage *= options->threads;
1254 
1255 	// Memory usage of the output queue
1256 	const uint64_t outq_memusage = lzma_outq_memusage(
1257 			outbuf_size_max, options->threads);
1258 	if (outq_memusage == UINT64_MAX)
1259 		return UINT64_MAX;
1260 
1261 	// Sum them with overflow checking.
1262 	uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1263 			+ sizeof(lzma_stream_coder)
1264 			+ options->threads * sizeof(worker_thread);
1265 
1266 	if (UINT64_MAX - total_memusage < inbuf_memusage)
1267 		return UINT64_MAX;
1268 
1269 	total_memusage += inbuf_memusage;
1270 
1271 	if (UINT64_MAX - total_memusage < filters_memusage)
1272 		return UINT64_MAX;
1273 
1274 	total_memusage += filters_memusage;
1275 
1276 	if (UINT64_MAX - total_memusage < outq_memusage)
1277 		return UINT64_MAX;
1278 
1279 	return total_memusage + outq_memusage;
1280 }
1281