1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 /// \file       stream_encoder_mt.c
4 /// \brief      Multithreaded .xz Stream encoder
5 //
6 //  Author:     Lasse Collin
7 //
8 //  This file has been put into the public domain.
9 //  You can do whatever you want with this file.
10 //
11 ///////////////////////////////////////////////////////////////////////////////
12 
13 #include "filter_encoder.h"
14 #include "easy_preset.h"
15 #include "block_encoder.h"
16 #include "block_buffer_encoder.h"
17 #include "index_encoder.h"
18 #include "outqueue.h"
19 
20 
21 /// Maximum supported block size. This makes it simpler to prevent integer
22 /// overflows if we are given unusually large block size.
23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
24 
25 
26 typedef enum {
27 	/// Waiting for work.
28 	THR_IDLE,
29 
30 	/// Encoding is in progress.
31 	THR_RUN,
32 
33 	/// Encoding is in progress but no more input data will
34 	/// be read.
35 	THR_FINISH,
36 
37 	/// The main thread wants the thread to stop whatever it was doing
38 	/// but not exit.
39 	THR_STOP,
40 
41 	/// The main thread wants the thread to exit. We could use
42 	/// cancellation but since there's stopped anyway, this is lazier.
43 	THR_EXIT,
44 
45 } worker_state;
46 
47 
48 typedef struct worker_thread_s worker_thread;
49 struct worker_thread_s {
50 	worker_state state;
51 
52 	/// Input buffer of coder->block_size bytes. The main thread will
53 	/// put new input into this and update in_size accordingly. Once
54 	/// no more input is coming, state will be set to THR_FINISH.
55 	uint8_t *in;
56 
57 	/// Amount of data available in the input buffer. This is modified
58 	/// only by the main thread.
59 	size_t in_size;
60 
61 	/// Output buffer for this thread. This is set by the main
62 	/// thread every time a new Block is started with this thread
63 	/// structure.
64 	lzma_outbuf *outbuf;
65 
66 	/// Pointer to the main structure is needed when putting this
67 	/// thread back to the stack of free threads.
68 	lzma_coder *coder;
69 
70 	/// The allocator is set by the main thread. Since a copy of the
71 	/// pointer is kept here, the application must not change the
72 	/// allocator before calling lzma_end().
73 	const lzma_allocator *allocator;
74 
75 	/// Amount of uncompressed data that has already been compressed.
76 	uint64_t progress_in;
77 
78 	/// Amount of compressed data that is ready.
79 	uint64_t progress_out;
80 
81 	/// Block encoder
82 	lzma_next_coder block_encoder;
83 
84 	/// Compression options for this Block
85 	lzma_block block_options;
86 
87 	/// Next structure in the stack of free worker threads.
88 	worker_thread *next;
89 
90 	mythread_mutex mutex;
91 	mythread_cond cond;
92 
93 	/// The ID of this thread is used to join the thread
94 	/// when it's not needed anymore.
95 	mythread thread_id;
96 };
97 
98 
99 struct lzma_coder_s {
100 	enum {
101 		SEQ_STREAM_HEADER,
102 		SEQ_BLOCK,
103 		SEQ_INDEX,
104 		SEQ_STREAM_FOOTER,
105 	} sequence;
106 
107 	/// Start a new Block every block_size bytes of input unless
108 	/// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
109 	size_t block_size;
110 
111 	/// The filter chain currently in use
112 	lzma_filter filters[LZMA_FILTERS_MAX + 1];
113 
114 
115 	/// Index to hold sizes of the Blocks
116 	lzma_index *index;
117 
118 	/// Index encoder
119 	lzma_next_coder index_encoder;
120 
121 
122 	/// Stream Flags for encoding the Stream Header and Stream Footer.
123 	lzma_stream_flags stream_flags;
124 
125 	/// Buffer to hold Stream Header and Stream Footer.
126 	uint8_t header[LZMA_STREAM_HEADER_SIZE];
127 
128 	/// Read position in header[]
129 	size_t header_pos;
130 
131 
132 	/// Output buffer queue for compressed data
133 	lzma_outq outq;
134 
135 
136 	/// Maximum wait time if cannot use all the input and cannot
137 	/// fill the output buffer. This is in milliseconds.
138 	uint32_t timeout;
139 
140 
141 	/// Error code from a worker thread
142 	lzma_ret thread_error;
143 
144 	/// Array of allocated thread-specific structures
145 	worker_thread *threads;
146 
147 	/// Number of structures in "threads" above. This is also the
148 	/// number of threads that will be created at maximum.
149 	uint32_t threads_max;
150 
151 	/// Number of thread structures that have been initialized, and
152 	/// thus the number of worker threads actually created so far.
153 	uint32_t threads_initialized;
154 
155 	/// Stack of free threads. When a thread finishes, it puts itself
156 	/// back into this stack. This starts as empty because threads
157 	/// are created only when actually needed.
158 	worker_thread *threads_free;
159 
160 	/// The most recent worker thread to which the main thread writes
161 	/// the new input from the application.
162 	worker_thread *thr;
163 
164 
165 	/// Amount of uncompressed data in Blocks that have already
166 	/// been finished.
167 	uint64_t progress_in;
168 
169 	/// Amount of compressed data in Stream Header + Blocks that
170 	/// have already been finished.
171 	uint64_t progress_out;
172 
173 
174 	mythread_mutex mutex;
175 	mythread_cond cond;
176 };
177 
178 
179 /// Tell the main thread that something has gone wrong.
180 static void
181 worker_error(worker_thread *thr, lzma_ret ret)
182 {
183 	assert(ret != LZMA_OK);
184 	assert(ret != LZMA_STREAM_END);
185 
186 	mythread_sync(thr->coder->mutex) {
187 		if (thr->coder->thread_error == LZMA_OK)
188 			thr->coder->thread_error = ret;
189 
190 		mythread_cond_signal(&thr->coder->cond);
191 	}
192 
193 	return;
194 }
195 
196 
197 static worker_state
198 worker_encode(worker_thread *thr, worker_state state)
199 {
200 	assert(thr->progress_in == 0);
201 	assert(thr->progress_out == 0);
202 
203 	// Set the Block options.
204 	thr->block_options = (lzma_block){
205 		.version = 0,
206 		.check = thr->coder->stream_flags.check,
207 		.compressed_size = thr->coder->outq.buf_size_max,
208 		.uncompressed_size = thr->coder->block_size,
209 
210 		// TODO: To allow changing the filter chain, the filters
211 		// array must be copied to each worker_thread.
212 		.filters = thr->coder->filters,
213 	};
214 
215 	// Calculate maximum size of the Block Header. This amount is
216 	// reserved in the beginning of the buffer so that Block Header
217 	// along with Compressed Size and Uncompressed Size can be
218 	// written there.
219 	lzma_ret ret = lzma_block_header_size(&thr->block_options);
220 	if (ret != LZMA_OK) {
221 		worker_error(thr, ret);
222 		return THR_STOP;
223 	}
224 
225 	// Initialize the Block encoder.
226 	ret = lzma_block_encoder_init(&thr->block_encoder,
227 			thr->allocator, &thr->block_options);
228 	if (ret != LZMA_OK) {
229 		worker_error(thr, ret);
230 		return THR_STOP;
231 	}
232 
233 	size_t in_pos = 0;
234 	size_t in_size = 0;
235 
236 	thr->outbuf->size = thr->block_options.header_size;
237 	const size_t out_size = thr->coder->outq.buf_size_max;
238 
239 	do {
240 		mythread_sync(thr->mutex) {
241 			// Store in_pos and out_pos into *thr so that
242 			// an application may read them via
243 			// lzma_get_progress() to get progress information.
244 			//
245 			// NOTE: These aren't updated when the encoding
246 			// finishes. Instead, the final values are taken
247 			// later from thr->outbuf.
248 			thr->progress_in = in_pos;
249 			thr->progress_out = thr->outbuf->size;
250 
251 			while (in_size == thr->in_size
252 					&& thr->state == THR_RUN)
253 				mythread_cond_wait(&thr->cond, &thr->mutex);
254 
255 			state = thr->state;
256 			in_size = thr->in_size;
257 		}
258 
259 		// Return if we were asked to stop or exit.
260 		if (state >= THR_STOP)
261 			return state;
262 
263 		lzma_action action = state == THR_FINISH
264 				? LZMA_FINISH : LZMA_RUN;
265 
266 		// Limit the amount of input given to the Block encoder
267 		// at once. This way this thread can react fairly quickly
268 		// if the main thread wants us to stop or exit.
269 		static const size_t in_chunk_max = 16384;
270 		size_t in_limit = in_size;
271 		if (in_size - in_pos > in_chunk_max) {
272 			in_limit = in_pos + in_chunk_max;
273 			action = LZMA_RUN;
274 		}
275 
276 		ret = thr->block_encoder.code(
277 				thr->block_encoder.coder, thr->allocator,
278 				thr->in, &in_pos, in_limit, thr->outbuf->buf,
279 				&thr->outbuf->size, out_size, action);
280 	} while (ret == LZMA_OK && thr->outbuf->size < out_size);
281 
282 	switch (ret) {
283 	case LZMA_STREAM_END:
284 		assert(state == THR_FINISH);
285 
286 		// Encode the Block Header. By doing it after
287 		// the compression, we can store the Compressed Size
288 		// and Uncompressed Size fields.
289 		ret = lzma_block_header_encode(&thr->block_options,
290 				thr->outbuf->buf);
291 		if (ret != LZMA_OK) {
292 			worker_error(thr, ret);
293 			return THR_STOP;
294 		}
295 
296 		break;
297 
298 	case LZMA_OK:
299 		// The data was incompressible. Encode it using uncompressed
300 		// LZMA2 chunks.
301 		//
302 		// First wait that we have gotten all the input.
303 		mythread_sync(thr->mutex) {
304 			while (thr->state == THR_RUN)
305 				mythread_cond_wait(&thr->cond, &thr->mutex);
306 
307 			state = thr->state;
308 			in_size = thr->in_size;
309 		}
310 
311 		if (state >= THR_STOP)
312 			return state;
313 
314 		// Do the encoding. This takes care of the Block Header too.
315 		thr->outbuf->size = 0;
316 		ret = lzma_block_uncomp_encode(&thr->block_options,
317 				thr->in, in_size, thr->outbuf->buf,
318 				&thr->outbuf->size, out_size);
319 
320 		// It shouldn't fail.
321 		if (ret != LZMA_OK) {
322 			worker_error(thr, LZMA_PROG_ERROR);
323 			return THR_STOP;
324 		}
325 
326 		break;
327 
328 	default:
329 		worker_error(thr, ret);
330 		return THR_STOP;
331 	}
332 
333 	// Set the size information that will be read by the main thread
334 	// to write the Index field.
335 	thr->outbuf->unpadded_size
336 			= lzma_block_unpadded_size(&thr->block_options);
337 	assert(thr->outbuf->unpadded_size != 0);
338 	thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
339 
340 	return THR_FINISH;
341 }
342 
343 
344 static MYTHREAD_RET_TYPE
345 worker_start(void *thr_ptr)
346 {
347 	worker_thread *thr = thr_ptr;
348 	worker_state state = THR_IDLE; // Init to silence a warning
349 
350 	while (true) {
351 		// Wait for work.
352 		mythread_sync(thr->mutex) {
353 			while (true) {
354 				// The thread is already idle so if we are
355 				// requested to stop, just set the state.
356 				if (thr->state == THR_STOP) {
357 					thr->state = THR_IDLE;
358 					mythread_cond_signal(&thr->cond);
359 				}
360 
361 				state = thr->state;
362 				if (state != THR_IDLE)
363 					break;
364 
365 				mythread_cond_wait(&thr->cond, &thr->mutex);
366 			}
367 		}
368 
369 		assert(state != THR_IDLE);
370 		assert(state != THR_STOP);
371 
372 		if (state <= THR_FINISH)
373 			state = worker_encode(thr, state);
374 
375 		if (state == THR_EXIT)
376 			break;
377 
378 		// Mark the thread as idle unless the main thread has
379 		// told us to exit. Signal is needed for the case
380 		// where the main thread is waiting for the threads to stop.
381 		mythread_sync(thr->mutex) {
382 			if (thr->state != THR_EXIT) {
383 				thr->state = THR_IDLE;
384 				mythread_cond_signal(&thr->cond);
385 			}
386 		}
387 
388 		mythread_sync(thr->coder->mutex) {
389 			// Mark the output buffer as finished if
390 			// no errors occurred.
391 			thr->outbuf->finished = state == THR_FINISH;
392 
393 			// Update the main progress info.
394 			thr->coder->progress_in
395 					+= thr->outbuf->uncompressed_size;
396 			thr->coder->progress_out += thr->outbuf->size;
397 			thr->progress_in = 0;
398 			thr->progress_out = 0;
399 
400 			// Return this thread to the stack of free threads.
401 			thr->next = thr->coder->threads_free;
402 			thr->coder->threads_free = thr;
403 
404 			mythread_cond_signal(&thr->coder->cond);
405 		}
406 	}
407 
408 	// Exiting, free the resources.
409 	mythread_mutex_destroy(&thr->mutex);
410 	mythread_cond_destroy(&thr->cond);
411 
412 	lzma_next_end(&thr->block_encoder, thr->allocator);
413 	lzma_free(thr->in, thr->allocator);
414 	return MYTHREAD_RET_VALUE;
415 }
416 
417 
418 /// Make the threads stop but not exit. Optionally wait for them to stop.
419 static void
420 threads_stop(lzma_coder *coder, bool wait_for_threads)
421 {
422 	// Tell the threads to stop.
423 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
424 		mythread_sync(coder->threads[i].mutex) {
425 			coder->threads[i].state = THR_STOP;
426 			mythread_cond_signal(&coder->threads[i].cond);
427 		}
428 	}
429 
430 	if (!wait_for_threads)
431 		return;
432 
433 	// Wait for the threads to settle in the idle state.
434 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
435 		mythread_sync(coder->threads[i].mutex) {
436 			while (coder->threads[i].state != THR_IDLE)
437 				mythread_cond_wait(&coder->threads[i].cond,
438 						&coder->threads[i].mutex);
439 		}
440 	}
441 
442 	return;
443 }
444 
445 
446 /// Stop the threads and free the resources associated with them.
447 /// Wait until the threads have exited.
448 static void
449 threads_end(lzma_coder *coder, const lzma_allocator *allocator)
450 {
451 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
452 		mythread_sync(coder->threads[i].mutex) {
453 			coder->threads[i].state = THR_EXIT;
454 			mythread_cond_signal(&coder->threads[i].cond);
455 		}
456 	}
457 
458 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
459 		int ret = mythread_join(coder->threads[i].thread_id);
460 		assert(ret == 0);
461 		(void)ret;
462 	}
463 
464 	lzma_free(coder->threads, allocator);
465 	return;
466 }
467 
468 
469 /// Initialize a new worker_thread structure and create a new thread.
470 static lzma_ret
471 initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
472 {
473 	worker_thread *thr = &coder->threads[coder->threads_initialized];
474 
475 	thr->in = lzma_alloc(coder->block_size, allocator);
476 	if (thr->in == NULL)
477 		return LZMA_MEM_ERROR;
478 
479 	if (mythread_mutex_init(&thr->mutex))
480 		goto error_mutex;
481 
482 	if (mythread_cond_init(&thr->cond))
483 		goto error_cond;
484 
485 	thr->state = THR_IDLE;
486 	thr->allocator = allocator;
487 	thr->coder = coder;
488 	thr->progress_in = 0;
489 	thr->progress_out = 0;
490 	thr->block_encoder = LZMA_NEXT_CODER_INIT;
491 
492 	if (mythread_create(&thr->thread_id, &worker_start, thr))
493 		goto error_thread;
494 
495 	++coder->threads_initialized;
496 	coder->thr = thr;
497 
498 	return LZMA_OK;
499 
500 error_thread:
501 	mythread_cond_destroy(&thr->cond);
502 
503 error_cond:
504 	mythread_mutex_destroy(&thr->mutex);
505 
506 error_mutex:
507 	lzma_free(thr->in, allocator);
508 	return LZMA_MEM_ERROR;
509 }
510 
511 
512 static lzma_ret
513 get_thread(lzma_coder *coder, const lzma_allocator *allocator)
514 {
515 	// If there are no free output subqueues, there is no
516 	// point to try getting a thread.
517 	if (!lzma_outq_has_buf(&coder->outq))
518 		return LZMA_OK;
519 
520 	// If there is a free structure on the stack, use it.
521 	mythread_sync(coder->mutex) {
522 		if (coder->threads_free != NULL) {
523 			coder->thr = coder->threads_free;
524 			coder->threads_free = coder->threads_free->next;
525 		}
526 	}
527 
528 	if (coder->thr == NULL) {
529 		// If there are no uninitialized structures left, return.
530 		if (coder->threads_initialized == coder->threads_max)
531 			return LZMA_OK;
532 
533 		// Initialize a new thread.
534 		return_if_error(initialize_new_thread(coder, allocator));
535 	}
536 
537 	// Reset the parts of the thread state that have to be done
538 	// in the main thread.
539 	mythread_sync(coder->thr->mutex) {
540 		coder->thr->state = THR_RUN;
541 		coder->thr->in_size = 0;
542 		coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
543 		mythread_cond_signal(&coder->thr->cond);
544 	}
545 
546 	return LZMA_OK;
547 }
548 
549 
550 static lzma_ret
551 stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator,
552 		const uint8_t *restrict in, size_t *restrict in_pos,
553 		size_t in_size, lzma_action action)
554 {
555 	while (*in_pos < in_size
556 			|| (coder->thr != NULL && action != LZMA_RUN)) {
557 		if (coder->thr == NULL) {
558 			// Get a new thread.
559 			const lzma_ret ret = get_thread(coder, allocator);
560 			if (coder->thr == NULL)
561 				return ret;
562 		}
563 
564 		// Copy the input data to thread's buffer.
565 		size_t thr_in_size = coder->thr->in_size;
566 		lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
567 				&thr_in_size, coder->block_size);
568 
569 		// Tell the Block encoder to finish if
570 		//  - it has got block_size bytes of input; or
571 		//  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
572 		//    or LZMA_FULL_BARRIER was used.
573 		//
574 		// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
575 		const bool finish = thr_in_size == coder->block_size
576 				|| (*in_pos == in_size && action != LZMA_RUN);
577 
578 		bool block_error = false;
579 
580 		mythread_sync(coder->thr->mutex) {
581 			if (coder->thr->state == THR_IDLE) {
582 				// Something has gone wrong with the Block
583 				// encoder. It has set coder->thread_error
584 				// which we will read a few lines later.
585 				block_error = true;
586 			} else {
587 				// Tell the Block encoder its new amount
588 				// of input and update the state if needed.
589 				coder->thr->in_size = thr_in_size;
590 
591 				if (finish)
592 					coder->thr->state = THR_FINISH;
593 
594 				mythread_cond_signal(&coder->thr->cond);
595 			}
596 		}
597 
598 		if (block_error) {
599 			lzma_ret ret;
600 
601 			mythread_sync(coder->mutex) {
602 				ret = coder->thread_error;
603 			}
604 
605 			return ret;
606 		}
607 
608 		if (finish)
609 			coder->thr = NULL;
610 	}
611 
612 	return LZMA_OK;
613 }
614 
615 
616 /// Wait until more input can be consumed, more output can be read, or
617 /// an optional timeout is reached.
618 static bool
619 wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs,
620 		bool *has_blocked, bool has_input)
621 {
622 	if (coder->timeout != 0 && !*has_blocked) {
623 		// Every time when stream_encode_mt() is called via
624 		// lzma_code(), *has_blocked starts as false. We set it
625 		// to true here and calculate the absolute time when
626 		// we must return if there's nothing to do.
627 		//
628 		// The idea of *has_blocked is to avoid unneeded calls
629 		// to mythread_condtime_set(), which may do a syscall
630 		// depending on the operating system.
631 		*has_blocked = true;
632 		mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
633 	}
634 
635 	bool timed_out = false;
636 
637 	mythread_sync(coder->mutex) {
638 		// There are four things that we wait. If one of them
639 		// becomes possible, we return.
640 		//  - If there is input left, we need to get a free
641 		//    worker thread and an output buffer for it.
642 		//  - Data ready to be read from the output queue.
643 		//  - A worker thread indicates an error.
644 		//  - Time out occurs.
645 		while ((!has_input || coder->threads_free == NULL
646 					|| !lzma_outq_has_buf(&coder->outq))
647 				&& !lzma_outq_is_readable(&coder->outq)
648 				&& coder->thread_error == LZMA_OK
649 				&& !timed_out) {
650 			if (coder->timeout != 0)
651 				timed_out = mythread_cond_timedwait(
652 						&coder->cond, &coder->mutex,
653 						wait_abs) != 0;
654 			else
655 				mythread_cond_wait(&coder->cond,
656 						&coder->mutex);
657 		}
658 	}
659 
660 	return timed_out;
661 }
662 
663 
664 static lzma_ret
665 stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
666 		const uint8_t *restrict in, size_t *restrict in_pos,
667 		size_t in_size, uint8_t *restrict out,
668 		size_t *restrict out_pos, size_t out_size, lzma_action action)
669 {
670 	switch (coder->sequence) {
671 	case SEQ_STREAM_HEADER:
672 		lzma_bufcpy(coder->header, &coder->header_pos,
673 				sizeof(coder->header),
674 				out, out_pos, out_size);
675 		if (coder->header_pos < sizeof(coder->header))
676 			return LZMA_OK;
677 
678 		coder->header_pos = 0;
679 		coder->sequence = SEQ_BLOCK;
680 
681 	// Fall through
682 
683 	case SEQ_BLOCK: {
684 		// Initialized to silence warnings.
685 		lzma_vli unpadded_size = 0;
686 		lzma_vli uncompressed_size = 0;
687 		lzma_ret ret = LZMA_OK;
688 
689 		// These are for wait_for_work().
690 		bool has_blocked = false;
691 		mythread_condtime wait_abs;
692 
693 		while (true) {
694 			mythread_sync(coder->mutex) {
695 				// Check for Block encoder errors.
696 				ret = coder->thread_error;
697 				if (ret != LZMA_OK) {
698 					assert(ret != LZMA_STREAM_END);
699 					break;
700 				}
701 
702 				// Try to read compressed data to out[].
703 				ret = lzma_outq_read(&coder->outq,
704 						out, out_pos, out_size,
705 						&unpadded_size,
706 						&uncompressed_size);
707 			}
708 
709 			if (ret == LZMA_STREAM_END) {
710 				// End of Block. Add it to the Index.
711 				ret = lzma_index_append(coder->index,
712 						allocator, unpadded_size,
713 						uncompressed_size);
714 
715 				// If we didn't fill the output buffer yet,
716 				// try to read more data. Maybe the next
717 				// outbuf has been finished already too.
718 				if (*out_pos < out_size)
719 					continue;
720 			}
721 
722 			if (ret != LZMA_OK) {
723 				// coder->thread_error was set or
724 				// lzma_index_append() failed.
725 				threads_stop(coder, false);
726 				return ret;
727 			}
728 
729 			// Try to give uncompressed data to a worker thread.
730 			ret = stream_encode_in(coder, allocator,
731 					in, in_pos, in_size, action);
732 			if (ret != LZMA_OK) {
733 				threads_stop(coder, false);
734 				return ret;
735 			}
736 
737 			// See if we should wait or return.
738 			//
739 			// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
740 			if (*in_pos == in_size) {
741 				// LZMA_RUN: More data is probably coming
742 				// so return to let the caller fill the
743 				// input buffer.
744 				if (action == LZMA_RUN)
745 					return LZMA_OK;
746 
747 				// LZMA_FULL_BARRIER: The same as with
748 				// LZMA_RUN but tell the caller that the
749 				// barrier was completed.
750 				if (action == LZMA_FULL_BARRIER)
751 					return LZMA_STREAM_END;
752 
753 				// Finishing or flushing isn't completed until
754 				// all input data has been encoded and copied
755 				// to the output buffer.
756 				if (lzma_outq_is_empty(&coder->outq)) {
757 					// LZMA_FINISH: Continue to encode
758 					// the Index field.
759 					if (action == LZMA_FINISH)
760 						break;
761 
762 					// LZMA_FULL_FLUSH: Return to tell
763 					// the caller that flushing was
764 					// completed.
765 					if (action == LZMA_FULL_FLUSH)
766 						return LZMA_STREAM_END;
767 				}
768 			}
769 
770 			// Return if there is no output space left.
771 			// This check must be done after testing the input
772 			// buffer, because we might want to use a different
773 			// return code.
774 			if (*out_pos == out_size)
775 				return LZMA_OK;
776 
777 			// Neither in nor out has been used completely.
778 			// Wait until there's something we can do.
779 			if (wait_for_work(coder, &wait_abs, &has_blocked,
780 					*in_pos < in_size))
781 				return LZMA_TIMED_OUT;
782 		}
783 
784 		// All Blocks have been encoded and the threads have stopped.
785 		// Prepare to encode the Index field.
786 		return_if_error(lzma_index_encoder_init(
787 				&coder->index_encoder, allocator,
788 				coder->index));
789 		coder->sequence = SEQ_INDEX;
790 
791 		// Update the progress info to take the Index and
792 		// Stream Footer into account. Those are very fast to encode
793 		// so in terms of progress information they can be thought
794 		// to be ready to be copied out.
795 		coder->progress_out += lzma_index_size(coder->index)
796 				+ LZMA_STREAM_HEADER_SIZE;
797 	}
798 
799 	// Fall through
800 
801 	case SEQ_INDEX: {
802 		// Call the Index encoder. It doesn't take any input, so
803 		// those pointers can be NULL.
804 		const lzma_ret ret = coder->index_encoder.code(
805 				coder->index_encoder.coder, allocator,
806 				NULL, NULL, 0,
807 				out, out_pos, out_size, LZMA_RUN);
808 		if (ret != LZMA_STREAM_END)
809 			return ret;
810 
811 		// Encode the Stream Footer into coder->buffer.
812 		coder->stream_flags.backward_size
813 				= lzma_index_size(coder->index);
814 		if (lzma_stream_footer_encode(&coder->stream_flags,
815 				coder->header) != LZMA_OK)
816 			return LZMA_PROG_ERROR;
817 
818 		coder->sequence = SEQ_STREAM_FOOTER;
819 	}
820 
821 	// Fall through
822 
823 	case SEQ_STREAM_FOOTER:
824 		lzma_bufcpy(coder->header, &coder->header_pos,
825 				sizeof(coder->header),
826 				out, out_pos, out_size);
827 		return coder->header_pos < sizeof(coder->header)
828 				? LZMA_OK : LZMA_STREAM_END;
829 	}
830 
831 	assert(0);
832 	return LZMA_PROG_ERROR;
833 }
834 
835 
836 static void
837 stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator)
838 {
839 	// Threads must be killed before the output queue can be freed.
840 	threads_end(coder, allocator);
841 	lzma_outq_end(&coder->outq, allocator);
842 
843 	for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
844 		lzma_free(coder->filters[i].options, allocator);
845 
846 	lzma_next_end(&coder->index_encoder, allocator);
847 	lzma_index_end(coder->index, allocator);
848 
849 	mythread_cond_destroy(&coder->cond);
850 	mythread_mutex_destroy(&coder->mutex);
851 
852 	lzma_free(coder, allocator);
853 	return;
854 }
855 
856 
857 /// Options handling for lzma_stream_encoder_mt_init() and
858 /// lzma_stream_encoder_mt_memusage()
859 static lzma_ret
860 get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
861 		const lzma_filter **filters, uint64_t *block_size,
862 		uint64_t *outbuf_size_max)
863 {
864 	// Validate some of the options.
865 	if (options == NULL)
866 		return LZMA_PROG_ERROR;
867 
868 	if (options->flags != 0 || options->threads == 0
869 			|| options->threads > LZMA_THREADS_MAX)
870 		return LZMA_OPTIONS_ERROR;
871 
872 	if (options->filters != NULL) {
873 		// Filter chain was given, use it as is.
874 		*filters = options->filters;
875 	} else {
876 		// Use a preset.
877 		if (lzma_easy_preset(opt_easy, options->preset))
878 			return LZMA_OPTIONS_ERROR;
879 
880 		*filters = opt_easy->filters;
881 	}
882 
883 	// Block size
884 	if (options->block_size > 0) {
885 		if (options->block_size > BLOCK_SIZE_MAX)
886 			return LZMA_OPTIONS_ERROR;
887 
888 		*block_size = options->block_size;
889 	} else {
890 		// Determine the Block size from the filter chain.
891 		*block_size = lzma_mt_block_size(*filters);
892 		if (*block_size == 0)
893 			return LZMA_OPTIONS_ERROR;
894 
895 		assert(*block_size <= BLOCK_SIZE_MAX);
896 	}
897 
898 	// Calculate the maximum amount output that a single output buffer
899 	// may need to hold. This is the same as the maximum total size of
900 	// a Block.
901 	*outbuf_size_max = lzma_block_buffer_bound64(*block_size);
902 	if (*outbuf_size_max == 0)
903 		return LZMA_MEM_ERROR;
904 
905 	return LZMA_OK;
906 }
907 
908 
909 static void
910 get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out)
911 {
912 	// Lock coder->mutex to prevent finishing threads from moving their
913 	// progress info from the worker_thread structure to lzma_coder.
914 	mythread_sync(coder->mutex) {
915 		*progress_in = coder->progress_in;
916 		*progress_out = coder->progress_out;
917 
918 		for (size_t i = 0; i < coder->threads_initialized; ++i) {
919 			mythread_sync(coder->threads[i].mutex) {
920 				*progress_in += coder->threads[i].progress_in;
921 				*progress_out += coder->threads[i]
922 						.progress_out;
923 			}
924 		}
925 	}
926 
927 	return;
928 }
929 
930 
931 static lzma_ret
932 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
933 		const lzma_mt *options)
934 {
935 	lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
936 
937 	// Get the filter chain.
938 	lzma_options_easy easy;
939 	const lzma_filter *filters;
940 	uint64_t block_size;
941 	uint64_t outbuf_size_max;
942 	return_if_error(get_options(options, &easy, &filters,
943 			&block_size, &outbuf_size_max));
944 
945 #if SIZE_MAX < UINT64_MAX
946 	if (block_size > SIZE_MAX)
947 		return LZMA_MEM_ERROR;
948 #endif
949 
950 	// Validate the filter chain so that we can give an error in this
951 	// function instead of delaying it to the first call to lzma_code().
952 	// The memory usage calculation verifies the filter chain as
953 	// a side effect so we take advatange of that.
954 	if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
955 		return LZMA_OPTIONS_ERROR;
956 
957 	// Validate the Check ID.
958 	if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
959 		return LZMA_PROG_ERROR;
960 
961 	if (!lzma_check_is_supported(options->check))
962 		return LZMA_UNSUPPORTED_CHECK;
963 
964 	// Allocate and initialize the base structure if needed.
965 	if (next->coder == NULL) {
966 		next->coder = lzma_alloc(sizeof(lzma_coder), allocator);
967 		if (next->coder == NULL)
968 			return LZMA_MEM_ERROR;
969 
970 		// For the mutex and condition variable initializations
971 		// the error handling has to be done here because
972 		// stream_encoder_mt_end() doesn't know if they have
973 		// already been initialized or not.
974 		if (mythread_mutex_init(&next->coder->mutex)) {
975 			lzma_free(next->coder, allocator);
976 			next->coder = NULL;
977 			return LZMA_MEM_ERROR;
978 		}
979 
980 		if (mythread_cond_init(&next->coder->cond)) {
981 			mythread_mutex_destroy(&next->coder->mutex);
982 			lzma_free(next->coder, allocator);
983 			next->coder = NULL;
984 			return LZMA_MEM_ERROR;
985 		}
986 
987 		next->code = &stream_encode_mt;
988 		next->end = &stream_encoder_mt_end;
989 		next->get_progress = &get_progress;
990 // 		next->update = &stream_encoder_mt_update;
991 
992 		next->coder->filters[0].id = LZMA_VLI_UNKNOWN;
993 		next->coder->index_encoder = LZMA_NEXT_CODER_INIT;
994 		next->coder->index = NULL;
995 		memzero(&next->coder->outq, sizeof(next->coder->outq));
996 		next->coder->threads = NULL;
997 		next->coder->threads_max = 0;
998 		next->coder->threads_initialized = 0;
999 	}
1000 
1001 	// Basic initializations
1002 	next->coder->sequence = SEQ_STREAM_HEADER;
1003 	next->coder->block_size = (size_t)(block_size);
1004 	next->coder->thread_error = LZMA_OK;
1005 	next->coder->thr = NULL;
1006 
1007 	// Allocate the thread-specific base structures.
1008 	assert(options->threads > 0);
1009 	if (next->coder->threads_max != options->threads) {
1010 		threads_end(next->coder, allocator);
1011 
1012 		next->coder->threads = NULL;
1013 		next->coder->threads_max = 0;
1014 
1015 		next->coder->threads_initialized = 0;
1016 		next->coder->threads_free = NULL;
1017 
1018 		next->coder->threads = lzma_alloc(
1019 				options->threads * sizeof(worker_thread),
1020 				allocator);
1021 		if (next->coder->threads == NULL)
1022 			return LZMA_MEM_ERROR;
1023 
1024 		next->coder->threads_max = options->threads;
1025 	} else {
1026 		// Reuse the old structures and threads. Tell the running
1027 		// threads to stop and wait until they have stopped.
1028 		threads_stop(next->coder, true);
1029 	}
1030 
1031 	// Output queue
1032 	return_if_error(lzma_outq_init(&next->coder->outq, allocator,
1033 			outbuf_size_max, options->threads));
1034 
1035 	// Timeout
1036 	next->coder->timeout = options->timeout;
1037 
1038 	// Free the old filter chain and copy the new one.
1039 	for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1040 		lzma_free(next->coder->filters[i].options, allocator);
1041 
1042 	return_if_error(lzma_filters_copy(
1043 			filters, next->coder->filters, allocator));
1044 
1045 	// Index
1046 	lzma_index_end(next->coder->index, allocator);
1047 	next->coder->index = lzma_index_init(allocator);
1048 	if (next->coder->index == NULL)
1049 		return LZMA_MEM_ERROR;
1050 
1051 	// Stream Header
1052 	next->coder->stream_flags.version = 0;
1053 	next->coder->stream_flags.check = options->check;
1054 	return_if_error(lzma_stream_header_encode(
1055 			&next->coder->stream_flags, next->coder->header));
1056 
1057 	next->coder->header_pos = 0;
1058 
1059 	// Progress info
1060 	next->coder->progress_in = 0;
1061 	next->coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1062 
1063 	return LZMA_OK;
1064 }
1065 
1066 
1067 extern LZMA_API(lzma_ret)
1068 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1069 {
1070 	lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1071 
1072 	strm->internal->supported_actions[LZMA_RUN] = true;
1073 // 	strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1074 	strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1075 	strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1076 	strm->internal->supported_actions[LZMA_FINISH] = true;
1077 
1078 	return LZMA_OK;
1079 }
1080 
1081 
1082 // This function name is a monster but it's consistent with the older
1083 // monster names. :-( 31 chars is the max that C99 requires so in that
1084 // sense it's not too long. ;-)
1085 extern LZMA_API(uint64_t)
1086 lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1087 {
1088 	lzma_options_easy easy;
1089 	const lzma_filter *filters;
1090 	uint64_t block_size;
1091 	uint64_t outbuf_size_max;
1092 
1093 	if (get_options(options, &easy, &filters, &block_size,
1094 			&outbuf_size_max) != LZMA_OK)
1095 		return UINT64_MAX;
1096 
1097 	// Memory usage of the input buffers
1098 	const uint64_t inbuf_memusage = options->threads * block_size;
1099 
1100 	// Memory usage of the filter encoders
1101 	uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1102 	if (filters_memusage == UINT64_MAX)
1103 		return UINT64_MAX;
1104 
1105 	filters_memusage *= options->threads;
1106 
1107 	// Memory usage of the output queue
1108 	const uint64_t outq_memusage = lzma_outq_memusage(
1109 			outbuf_size_max, options->threads);
1110 	if (outq_memusage == UINT64_MAX)
1111 		return UINT64_MAX;
1112 
1113 	// Sum them with overflow checking.
1114 	uint64_t total_memusage = LZMA_MEMUSAGE_BASE + sizeof(lzma_coder)
1115 			+ options->threads * sizeof(worker_thread);
1116 
1117 	if (UINT64_MAX - total_memusage < inbuf_memusage)
1118 		return UINT64_MAX;
1119 
1120 	total_memusage += inbuf_memusage;
1121 
1122 	if (UINT64_MAX - total_memusage < filters_memusage)
1123 		return UINT64_MAX;
1124 
1125 	total_memusage += filters_memusage;
1126 
1127 	if (UINT64_MAX - total_memusage < outq_memusage)
1128 		return UINT64_MAX;
1129 
1130 	return total_memusage + outq_memusage;
1131 }
1132