115ab8c86SJohn Marino ///////////////////////////////////////////////////////////////////////////////
215ab8c86SJohn Marino //
315ab8c86SJohn Marino /// \file stream_encoder_mt.c
415ab8c86SJohn Marino /// \brief Multithreaded .xz Stream encoder
515ab8c86SJohn Marino //
615ab8c86SJohn Marino // Author: Lasse Collin
715ab8c86SJohn Marino //
815ab8c86SJohn Marino // This file has been put into the public domain.
915ab8c86SJohn Marino // You can do whatever you want with this file.
1015ab8c86SJohn Marino //
1115ab8c86SJohn Marino ///////////////////////////////////////////////////////////////////////////////
1215ab8c86SJohn Marino
1315ab8c86SJohn Marino #include "filter_encoder.h"
1415ab8c86SJohn Marino #include "easy_preset.h"
1515ab8c86SJohn Marino #include "block_encoder.h"
1615ab8c86SJohn Marino #include "block_buffer_encoder.h"
1715ab8c86SJohn Marino #include "index_encoder.h"
1815ab8c86SJohn Marino #include "outqueue.h"
1915ab8c86SJohn Marino
2015ab8c86SJohn Marino
2115ab8c86SJohn Marino /// Maximum supported block size. This makes it simpler to prevent integer
2215ab8c86SJohn Marino /// overflows if we are given unusually large block size.
2315ab8c86SJohn Marino #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
2415ab8c86SJohn Marino
2515ab8c86SJohn Marino
2615ab8c86SJohn Marino typedef enum {
2715ab8c86SJohn Marino /// Waiting for work.
2815ab8c86SJohn Marino THR_IDLE,
2915ab8c86SJohn Marino
3015ab8c86SJohn Marino /// Encoding is in progress.
3115ab8c86SJohn Marino THR_RUN,
3215ab8c86SJohn Marino
3315ab8c86SJohn Marino /// Encoding is in progress but no more input data will
3415ab8c86SJohn Marino /// be read.
3515ab8c86SJohn Marino THR_FINISH,
3615ab8c86SJohn Marino
3715ab8c86SJohn Marino /// The main thread wants the thread to stop whatever it was doing
3815ab8c86SJohn Marino /// but not exit.
3915ab8c86SJohn Marino THR_STOP,
4015ab8c86SJohn Marino
4115ab8c86SJohn Marino /// The main thread wants the thread to exit. We could use
4215ab8c86SJohn Marino /// cancellation but since there's stopped anyway, this is lazier.
4315ab8c86SJohn Marino THR_EXIT,
4415ab8c86SJohn Marino
4515ab8c86SJohn Marino } worker_state;
4615ab8c86SJohn Marino
4746a2189dSzrj typedef struct lzma_stream_coder_s lzma_stream_coder;
4815ab8c86SJohn Marino
4915ab8c86SJohn Marino typedef struct worker_thread_s worker_thread;
5015ab8c86SJohn Marino struct worker_thread_s {
5115ab8c86SJohn Marino worker_state state;
5215ab8c86SJohn Marino
5315ab8c86SJohn Marino /// Input buffer of coder->block_size bytes. The main thread will
5415ab8c86SJohn Marino /// put new input into this and update in_size accordingly. Once
5515ab8c86SJohn Marino /// no more input is coming, state will be set to THR_FINISH.
5615ab8c86SJohn Marino uint8_t *in;
5715ab8c86SJohn Marino
5815ab8c86SJohn Marino /// Amount of data available in the input buffer. This is modified
5915ab8c86SJohn Marino /// only by the main thread.
6015ab8c86SJohn Marino size_t in_size;
6115ab8c86SJohn Marino
6215ab8c86SJohn Marino /// Output buffer for this thread. This is set by the main
6315ab8c86SJohn Marino /// thread every time a new Block is started with this thread
6415ab8c86SJohn Marino /// structure.
6515ab8c86SJohn Marino lzma_outbuf *outbuf;
6615ab8c86SJohn Marino
6715ab8c86SJohn Marino /// Pointer to the main structure is needed when putting this
6815ab8c86SJohn Marino /// thread back to the stack of free threads.
6946a2189dSzrj lzma_stream_coder *coder;
7015ab8c86SJohn Marino
7115ab8c86SJohn Marino /// The allocator is set by the main thread. Since a copy of the
7215ab8c86SJohn Marino /// pointer is kept here, the application must not change the
7315ab8c86SJohn Marino /// allocator before calling lzma_end().
7415ab8c86SJohn Marino const lzma_allocator *allocator;
7515ab8c86SJohn Marino
7615ab8c86SJohn Marino /// Amount of uncompressed data that has already been compressed.
7715ab8c86SJohn Marino uint64_t progress_in;
7815ab8c86SJohn Marino
7915ab8c86SJohn Marino /// Amount of compressed data that is ready.
8015ab8c86SJohn Marino uint64_t progress_out;
8115ab8c86SJohn Marino
8215ab8c86SJohn Marino /// Block encoder
8315ab8c86SJohn Marino lzma_next_coder block_encoder;
8415ab8c86SJohn Marino
8515ab8c86SJohn Marino /// Compression options for this Block
8615ab8c86SJohn Marino lzma_block block_options;
8715ab8c86SJohn Marino
8815ab8c86SJohn Marino /// Next structure in the stack of free worker threads.
8915ab8c86SJohn Marino worker_thread *next;
9015ab8c86SJohn Marino
9115ab8c86SJohn Marino mythread_mutex mutex;
9215ab8c86SJohn Marino mythread_cond cond;
9315ab8c86SJohn Marino
9415ab8c86SJohn Marino /// The ID of this thread is used to join the thread
9515ab8c86SJohn Marino /// when it's not needed anymore.
9615ab8c86SJohn Marino mythread thread_id;
9715ab8c86SJohn Marino };
9815ab8c86SJohn Marino
9915ab8c86SJohn Marino
10046a2189dSzrj struct lzma_stream_coder_s {
10115ab8c86SJohn Marino enum {
10215ab8c86SJohn Marino SEQ_STREAM_HEADER,
10315ab8c86SJohn Marino SEQ_BLOCK,
10415ab8c86SJohn Marino SEQ_INDEX,
10515ab8c86SJohn Marino SEQ_STREAM_FOOTER,
10615ab8c86SJohn Marino } sequence;
10715ab8c86SJohn Marino
10815ab8c86SJohn Marino /// Start a new Block every block_size bytes of input unless
10915ab8c86SJohn Marino /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
11015ab8c86SJohn Marino size_t block_size;
11115ab8c86SJohn Marino
11215ab8c86SJohn Marino /// The filter chain currently in use
11315ab8c86SJohn Marino lzma_filter filters[LZMA_FILTERS_MAX + 1];
11415ab8c86SJohn Marino
11515ab8c86SJohn Marino
11615ab8c86SJohn Marino /// Index to hold sizes of the Blocks
11715ab8c86SJohn Marino lzma_index *index;
11815ab8c86SJohn Marino
11915ab8c86SJohn Marino /// Index encoder
12015ab8c86SJohn Marino lzma_next_coder index_encoder;
12115ab8c86SJohn Marino
12215ab8c86SJohn Marino
12315ab8c86SJohn Marino /// Stream Flags for encoding the Stream Header and Stream Footer.
12415ab8c86SJohn Marino lzma_stream_flags stream_flags;
12515ab8c86SJohn Marino
12615ab8c86SJohn Marino /// Buffer to hold Stream Header and Stream Footer.
12715ab8c86SJohn Marino uint8_t header[LZMA_STREAM_HEADER_SIZE];
12815ab8c86SJohn Marino
12915ab8c86SJohn Marino /// Read position in header[]
13015ab8c86SJohn Marino size_t header_pos;
13115ab8c86SJohn Marino
13215ab8c86SJohn Marino
13315ab8c86SJohn Marino /// Output buffer queue for compressed data
13415ab8c86SJohn Marino lzma_outq outq;
13515ab8c86SJohn Marino
13615ab8c86SJohn Marino
13715ab8c86SJohn Marino /// Maximum wait time if cannot use all the input and cannot
13815ab8c86SJohn Marino /// fill the output buffer. This is in milliseconds.
13915ab8c86SJohn Marino uint32_t timeout;
14015ab8c86SJohn Marino
14115ab8c86SJohn Marino
14215ab8c86SJohn Marino /// Error code from a worker thread
14315ab8c86SJohn Marino lzma_ret thread_error;
14415ab8c86SJohn Marino
14515ab8c86SJohn Marino /// Array of allocated thread-specific structures
14615ab8c86SJohn Marino worker_thread *threads;
14715ab8c86SJohn Marino
14815ab8c86SJohn Marino /// Number of structures in "threads" above. This is also the
14915ab8c86SJohn Marino /// number of threads that will be created at maximum.
15015ab8c86SJohn Marino uint32_t threads_max;
15115ab8c86SJohn Marino
15215ab8c86SJohn Marino /// Number of thread structures that have been initialized, and
15315ab8c86SJohn Marino /// thus the number of worker threads actually created so far.
15415ab8c86SJohn Marino uint32_t threads_initialized;
15515ab8c86SJohn Marino
15615ab8c86SJohn Marino /// Stack of free threads. When a thread finishes, it puts itself
15715ab8c86SJohn Marino /// back into this stack. This starts as empty because threads
15815ab8c86SJohn Marino /// are created only when actually needed.
15915ab8c86SJohn Marino worker_thread *threads_free;
16015ab8c86SJohn Marino
16115ab8c86SJohn Marino /// The most recent worker thread to which the main thread writes
16215ab8c86SJohn Marino /// the new input from the application.
16315ab8c86SJohn Marino worker_thread *thr;
16415ab8c86SJohn Marino
16515ab8c86SJohn Marino
16615ab8c86SJohn Marino /// Amount of uncompressed data in Blocks that have already
16715ab8c86SJohn Marino /// been finished.
16815ab8c86SJohn Marino uint64_t progress_in;
16915ab8c86SJohn Marino
17015ab8c86SJohn Marino /// Amount of compressed data in Stream Header + Blocks that
17115ab8c86SJohn Marino /// have already been finished.
17215ab8c86SJohn Marino uint64_t progress_out;
17315ab8c86SJohn Marino
17415ab8c86SJohn Marino
17515ab8c86SJohn Marino mythread_mutex mutex;
17615ab8c86SJohn Marino mythread_cond cond;
17715ab8c86SJohn Marino };
17815ab8c86SJohn Marino
17915ab8c86SJohn Marino
18015ab8c86SJohn Marino /// Tell the main thread that something has gone wrong.
18115ab8c86SJohn Marino static void
worker_error(worker_thread * thr,lzma_ret ret)18215ab8c86SJohn Marino worker_error(worker_thread *thr, lzma_ret ret)
18315ab8c86SJohn Marino {
18415ab8c86SJohn Marino assert(ret != LZMA_OK);
18515ab8c86SJohn Marino assert(ret != LZMA_STREAM_END);
18615ab8c86SJohn Marino
18715ab8c86SJohn Marino mythread_sync(thr->coder->mutex) {
18815ab8c86SJohn Marino if (thr->coder->thread_error == LZMA_OK)
18915ab8c86SJohn Marino thr->coder->thread_error = ret;
19015ab8c86SJohn Marino
19115ab8c86SJohn Marino mythread_cond_signal(&thr->coder->cond);
19215ab8c86SJohn Marino }
19315ab8c86SJohn Marino
19415ab8c86SJohn Marino return;
19515ab8c86SJohn Marino }
19615ab8c86SJohn Marino
19715ab8c86SJohn Marino
19815ab8c86SJohn Marino static worker_state
worker_encode(worker_thread * thr,worker_state state)19915ab8c86SJohn Marino worker_encode(worker_thread *thr, worker_state state)
20015ab8c86SJohn Marino {
20115ab8c86SJohn Marino assert(thr->progress_in == 0);
20215ab8c86SJohn Marino assert(thr->progress_out == 0);
20315ab8c86SJohn Marino
20415ab8c86SJohn Marino // Set the Block options.
20515ab8c86SJohn Marino thr->block_options = (lzma_block){
20615ab8c86SJohn Marino .version = 0,
20715ab8c86SJohn Marino .check = thr->coder->stream_flags.check,
20815ab8c86SJohn Marino .compressed_size = thr->coder->outq.buf_size_max,
20915ab8c86SJohn Marino .uncompressed_size = thr->coder->block_size,
21015ab8c86SJohn Marino
21115ab8c86SJohn Marino // TODO: To allow changing the filter chain, the filters
21215ab8c86SJohn Marino // array must be copied to each worker_thread.
21315ab8c86SJohn Marino .filters = thr->coder->filters,
21415ab8c86SJohn Marino };
21515ab8c86SJohn Marino
21615ab8c86SJohn Marino // Calculate maximum size of the Block Header. This amount is
21715ab8c86SJohn Marino // reserved in the beginning of the buffer so that Block Header
21815ab8c86SJohn Marino // along with Compressed Size and Uncompressed Size can be
21915ab8c86SJohn Marino // written there.
22015ab8c86SJohn Marino lzma_ret ret = lzma_block_header_size(&thr->block_options);
22115ab8c86SJohn Marino if (ret != LZMA_OK) {
22215ab8c86SJohn Marino worker_error(thr, ret);
22315ab8c86SJohn Marino return THR_STOP;
22415ab8c86SJohn Marino }
22515ab8c86SJohn Marino
22615ab8c86SJohn Marino // Initialize the Block encoder.
22715ab8c86SJohn Marino ret = lzma_block_encoder_init(&thr->block_encoder,
22815ab8c86SJohn Marino thr->allocator, &thr->block_options);
22915ab8c86SJohn Marino if (ret != LZMA_OK) {
23015ab8c86SJohn Marino worker_error(thr, ret);
23115ab8c86SJohn Marino return THR_STOP;
23215ab8c86SJohn Marino }
23315ab8c86SJohn Marino
23415ab8c86SJohn Marino size_t in_pos = 0;
23515ab8c86SJohn Marino size_t in_size = 0;
23615ab8c86SJohn Marino
23715ab8c86SJohn Marino thr->outbuf->size = thr->block_options.header_size;
23815ab8c86SJohn Marino const size_t out_size = thr->coder->outq.buf_size_max;
23915ab8c86SJohn Marino
24015ab8c86SJohn Marino do {
24115ab8c86SJohn Marino mythread_sync(thr->mutex) {
24215ab8c86SJohn Marino // Store in_pos and out_pos into *thr so that
24315ab8c86SJohn Marino // an application may read them via
24415ab8c86SJohn Marino // lzma_get_progress() to get progress information.
24515ab8c86SJohn Marino //
24615ab8c86SJohn Marino // NOTE: These aren't updated when the encoding
24715ab8c86SJohn Marino // finishes. Instead, the final values are taken
24815ab8c86SJohn Marino // later from thr->outbuf.
24915ab8c86SJohn Marino thr->progress_in = in_pos;
25015ab8c86SJohn Marino thr->progress_out = thr->outbuf->size;
25115ab8c86SJohn Marino
25215ab8c86SJohn Marino while (in_size == thr->in_size
25315ab8c86SJohn Marino && thr->state == THR_RUN)
25415ab8c86SJohn Marino mythread_cond_wait(&thr->cond, &thr->mutex);
25515ab8c86SJohn Marino
25615ab8c86SJohn Marino state = thr->state;
25715ab8c86SJohn Marino in_size = thr->in_size;
25815ab8c86SJohn Marino }
25915ab8c86SJohn Marino
26015ab8c86SJohn Marino // Return if we were asked to stop or exit.
26115ab8c86SJohn Marino if (state >= THR_STOP)
26215ab8c86SJohn Marino return state;
26315ab8c86SJohn Marino
26415ab8c86SJohn Marino lzma_action action = state == THR_FINISH
26515ab8c86SJohn Marino ? LZMA_FINISH : LZMA_RUN;
26615ab8c86SJohn Marino
26715ab8c86SJohn Marino // Limit the amount of input given to the Block encoder
26815ab8c86SJohn Marino // at once. This way this thread can react fairly quickly
26915ab8c86SJohn Marino // if the main thread wants us to stop or exit.
27015ab8c86SJohn Marino static const size_t in_chunk_max = 16384;
27115ab8c86SJohn Marino size_t in_limit = in_size;
27215ab8c86SJohn Marino if (in_size - in_pos > in_chunk_max) {
27315ab8c86SJohn Marino in_limit = in_pos + in_chunk_max;
27415ab8c86SJohn Marino action = LZMA_RUN;
27515ab8c86SJohn Marino }
27615ab8c86SJohn Marino
27715ab8c86SJohn Marino ret = thr->block_encoder.code(
27815ab8c86SJohn Marino thr->block_encoder.coder, thr->allocator,
27915ab8c86SJohn Marino thr->in, &in_pos, in_limit, thr->outbuf->buf,
28015ab8c86SJohn Marino &thr->outbuf->size, out_size, action);
28115ab8c86SJohn Marino } while (ret == LZMA_OK && thr->outbuf->size < out_size);
28215ab8c86SJohn Marino
28315ab8c86SJohn Marino switch (ret) {
28415ab8c86SJohn Marino case LZMA_STREAM_END:
28515ab8c86SJohn Marino assert(state == THR_FINISH);
28615ab8c86SJohn Marino
28715ab8c86SJohn Marino // Encode the Block Header. By doing it after
28815ab8c86SJohn Marino // the compression, we can store the Compressed Size
28915ab8c86SJohn Marino // and Uncompressed Size fields.
29015ab8c86SJohn Marino ret = lzma_block_header_encode(&thr->block_options,
29115ab8c86SJohn Marino thr->outbuf->buf);
29215ab8c86SJohn Marino if (ret != LZMA_OK) {
29315ab8c86SJohn Marino worker_error(thr, ret);
29415ab8c86SJohn Marino return THR_STOP;
29515ab8c86SJohn Marino }
29615ab8c86SJohn Marino
29715ab8c86SJohn Marino break;
29815ab8c86SJohn Marino
29915ab8c86SJohn Marino case LZMA_OK:
30015ab8c86SJohn Marino // The data was incompressible. Encode it using uncompressed
30115ab8c86SJohn Marino // LZMA2 chunks.
30215ab8c86SJohn Marino //
30315ab8c86SJohn Marino // First wait that we have gotten all the input.
30415ab8c86SJohn Marino mythread_sync(thr->mutex) {
30515ab8c86SJohn Marino while (thr->state == THR_RUN)
30615ab8c86SJohn Marino mythread_cond_wait(&thr->cond, &thr->mutex);
30715ab8c86SJohn Marino
30815ab8c86SJohn Marino state = thr->state;
30915ab8c86SJohn Marino in_size = thr->in_size;
31015ab8c86SJohn Marino }
31115ab8c86SJohn Marino
31215ab8c86SJohn Marino if (state >= THR_STOP)
31315ab8c86SJohn Marino return state;
31415ab8c86SJohn Marino
31515ab8c86SJohn Marino // Do the encoding. This takes care of the Block Header too.
31615ab8c86SJohn Marino thr->outbuf->size = 0;
31715ab8c86SJohn Marino ret = lzma_block_uncomp_encode(&thr->block_options,
31815ab8c86SJohn Marino thr->in, in_size, thr->outbuf->buf,
31915ab8c86SJohn Marino &thr->outbuf->size, out_size);
32015ab8c86SJohn Marino
32115ab8c86SJohn Marino // It shouldn't fail.
32215ab8c86SJohn Marino if (ret != LZMA_OK) {
32315ab8c86SJohn Marino worker_error(thr, LZMA_PROG_ERROR);
32415ab8c86SJohn Marino return THR_STOP;
32515ab8c86SJohn Marino }
32615ab8c86SJohn Marino
32715ab8c86SJohn Marino break;
32815ab8c86SJohn Marino
32915ab8c86SJohn Marino default:
33015ab8c86SJohn Marino worker_error(thr, ret);
33115ab8c86SJohn Marino return THR_STOP;
33215ab8c86SJohn Marino }
33315ab8c86SJohn Marino
33415ab8c86SJohn Marino // Set the size information that will be read by the main thread
33515ab8c86SJohn Marino // to write the Index field.
33615ab8c86SJohn Marino thr->outbuf->unpadded_size
33715ab8c86SJohn Marino = lzma_block_unpadded_size(&thr->block_options);
33815ab8c86SJohn Marino assert(thr->outbuf->unpadded_size != 0);
33915ab8c86SJohn Marino thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
34015ab8c86SJohn Marino
34115ab8c86SJohn Marino return THR_FINISH;
34215ab8c86SJohn Marino }
34315ab8c86SJohn Marino
34415ab8c86SJohn Marino
34515ab8c86SJohn Marino static MYTHREAD_RET_TYPE
worker_start(void * thr_ptr)34615ab8c86SJohn Marino worker_start(void *thr_ptr)
34715ab8c86SJohn Marino {
34815ab8c86SJohn Marino worker_thread *thr = thr_ptr;
34915ab8c86SJohn Marino worker_state state = THR_IDLE; // Init to silence a warning
35015ab8c86SJohn Marino
35115ab8c86SJohn Marino while (true) {
35215ab8c86SJohn Marino // Wait for work.
35315ab8c86SJohn Marino mythread_sync(thr->mutex) {
35415ab8c86SJohn Marino while (true) {
35515ab8c86SJohn Marino // The thread is already idle so if we are
35615ab8c86SJohn Marino // requested to stop, just set the state.
35715ab8c86SJohn Marino if (thr->state == THR_STOP) {
35815ab8c86SJohn Marino thr->state = THR_IDLE;
35915ab8c86SJohn Marino mythread_cond_signal(&thr->cond);
36015ab8c86SJohn Marino }
36115ab8c86SJohn Marino
36215ab8c86SJohn Marino state = thr->state;
36315ab8c86SJohn Marino if (state != THR_IDLE)
36415ab8c86SJohn Marino break;
36515ab8c86SJohn Marino
36615ab8c86SJohn Marino mythread_cond_wait(&thr->cond, &thr->mutex);
36715ab8c86SJohn Marino }
36815ab8c86SJohn Marino }
36915ab8c86SJohn Marino
37015ab8c86SJohn Marino assert(state != THR_IDLE);
37115ab8c86SJohn Marino assert(state != THR_STOP);
37215ab8c86SJohn Marino
37315ab8c86SJohn Marino if (state <= THR_FINISH)
37415ab8c86SJohn Marino state = worker_encode(thr, state);
37515ab8c86SJohn Marino
37615ab8c86SJohn Marino if (state == THR_EXIT)
37715ab8c86SJohn Marino break;
37815ab8c86SJohn Marino
37915ab8c86SJohn Marino // Mark the thread as idle unless the main thread has
38015ab8c86SJohn Marino // told us to exit. Signal is needed for the case
38115ab8c86SJohn Marino // where the main thread is waiting for the threads to stop.
38215ab8c86SJohn Marino mythread_sync(thr->mutex) {
38315ab8c86SJohn Marino if (thr->state != THR_EXIT) {
38415ab8c86SJohn Marino thr->state = THR_IDLE;
38515ab8c86SJohn Marino mythread_cond_signal(&thr->cond);
38615ab8c86SJohn Marino }
38715ab8c86SJohn Marino }
38815ab8c86SJohn Marino
38915ab8c86SJohn Marino mythread_sync(thr->coder->mutex) {
39015ab8c86SJohn Marino // Mark the output buffer as finished if
39115ab8c86SJohn Marino // no errors occurred.
39215ab8c86SJohn Marino thr->outbuf->finished = state == THR_FINISH;
39315ab8c86SJohn Marino
39415ab8c86SJohn Marino // Update the main progress info.
39515ab8c86SJohn Marino thr->coder->progress_in
39615ab8c86SJohn Marino += thr->outbuf->uncompressed_size;
39715ab8c86SJohn Marino thr->coder->progress_out += thr->outbuf->size;
39815ab8c86SJohn Marino thr->progress_in = 0;
39915ab8c86SJohn Marino thr->progress_out = 0;
40015ab8c86SJohn Marino
40115ab8c86SJohn Marino // Return this thread to the stack of free threads.
40215ab8c86SJohn Marino thr->next = thr->coder->threads_free;
40315ab8c86SJohn Marino thr->coder->threads_free = thr;
40415ab8c86SJohn Marino
40515ab8c86SJohn Marino mythread_cond_signal(&thr->coder->cond);
40615ab8c86SJohn Marino }
40715ab8c86SJohn Marino }
40815ab8c86SJohn Marino
40915ab8c86SJohn Marino // Exiting, free the resources.
41015ab8c86SJohn Marino mythread_mutex_destroy(&thr->mutex);
41115ab8c86SJohn Marino mythread_cond_destroy(&thr->cond);
41215ab8c86SJohn Marino
41315ab8c86SJohn Marino lzma_next_end(&thr->block_encoder, thr->allocator);
41415ab8c86SJohn Marino lzma_free(thr->in, thr->allocator);
41515ab8c86SJohn Marino return MYTHREAD_RET_VALUE;
41615ab8c86SJohn Marino }
41715ab8c86SJohn Marino
41815ab8c86SJohn Marino
41915ab8c86SJohn Marino /// Make the threads stop but not exit. Optionally wait for them to stop.
42015ab8c86SJohn Marino static void
threads_stop(lzma_stream_coder * coder,bool wait_for_threads)42146a2189dSzrj threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
42215ab8c86SJohn Marino {
42315ab8c86SJohn Marino // Tell the threads to stop.
42415ab8c86SJohn Marino for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
42515ab8c86SJohn Marino mythread_sync(coder->threads[i].mutex) {
42615ab8c86SJohn Marino coder->threads[i].state = THR_STOP;
42715ab8c86SJohn Marino mythread_cond_signal(&coder->threads[i].cond);
42815ab8c86SJohn Marino }
42915ab8c86SJohn Marino }
43015ab8c86SJohn Marino
43115ab8c86SJohn Marino if (!wait_for_threads)
43215ab8c86SJohn Marino return;
43315ab8c86SJohn Marino
43415ab8c86SJohn Marino // Wait for the threads to settle in the idle state.
43515ab8c86SJohn Marino for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
43615ab8c86SJohn Marino mythread_sync(coder->threads[i].mutex) {
43715ab8c86SJohn Marino while (coder->threads[i].state != THR_IDLE)
43815ab8c86SJohn Marino mythread_cond_wait(&coder->threads[i].cond,
43915ab8c86SJohn Marino &coder->threads[i].mutex);
44015ab8c86SJohn Marino }
44115ab8c86SJohn Marino }
44215ab8c86SJohn Marino
44315ab8c86SJohn Marino return;
44415ab8c86SJohn Marino }
44515ab8c86SJohn Marino
44615ab8c86SJohn Marino
44715ab8c86SJohn Marino /// Stop the threads and free the resources associated with them.
44815ab8c86SJohn Marino /// Wait until the threads have exited.
44915ab8c86SJohn Marino static void
threads_end(lzma_stream_coder * coder,const lzma_allocator * allocator)45046a2189dSzrj threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
45115ab8c86SJohn Marino {
45215ab8c86SJohn Marino for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
45315ab8c86SJohn Marino mythread_sync(coder->threads[i].mutex) {
45415ab8c86SJohn Marino coder->threads[i].state = THR_EXIT;
45515ab8c86SJohn Marino mythread_cond_signal(&coder->threads[i].cond);
45615ab8c86SJohn Marino }
45715ab8c86SJohn Marino }
45815ab8c86SJohn Marino
45915ab8c86SJohn Marino for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
46015ab8c86SJohn Marino int ret = mythread_join(coder->threads[i].thread_id);
46115ab8c86SJohn Marino assert(ret == 0);
46215ab8c86SJohn Marino (void)ret;
46315ab8c86SJohn Marino }
46415ab8c86SJohn Marino
46515ab8c86SJohn Marino lzma_free(coder->threads, allocator);
46615ab8c86SJohn Marino return;
46715ab8c86SJohn Marino }
46815ab8c86SJohn Marino
46915ab8c86SJohn Marino
47015ab8c86SJohn Marino /// Initialize a new worker_thread structure and create a new thread.
47115ab8c86SJohn Marino static lzma_ret
initialize_new_thread(lzma_stream_coder * coder,const lzma_allocator * allocator)47246a2189dSzrj initialize_new_thread(lzma_stream_coder *coder,
47346a2189dSzrj const lzma_allocator *allocator)
47415ab8c86SJohn Marino {
47515ab8c86SJohn Marino worker_thread *thr = &coder->threads[coder->threads_initialized];
47615ab8c86SJohn Marino
47715ab8c86SJohn Marino thr->in = lzma_alloc(coder->block_size, allocator);
47815ab8c86SJohn Marino if (thr->in == NULL)
47915ab8c86SJohn Marino return LZMA_MEM_ERROR;
48015ab8c86SJohn Marino
48115ab8c86SJohn Marino if (mythread_mutex_init(&thr->mutex))
48215ab8c86SJohn Marino goto error_mutex;
48315ab8c86SJohn Marino
48415ab8c86SJohn Marino if (mythread_cond_init(&thr->cond))
48515ab8c86SJohn Marino goto error_cond;
48615ab8c86SJohn Marino
48715ab8c86SJohn Marino thr->state = THR_IDLE;
48815ab8c86SJohn Marino thr->allocator = allocator;
48915ab8c86SJohn Marino thr->coder = coder;
49015ab8c86SJohn Marino thr->progress_in = 0;
49115ab8c86SJohn Marino thr->progress_out = 0;
49215ab8c86SJohn Marino thr->block_encoder = LZMA_NEXT_CODER_INIT;
49315ab8c86SJohn Marino
49415ab8c86SJohn Marino if (mythread_create(&thr->thread_id, &worker_start, thr))
49515ab8c86SJohn Marino goto error_thread;
49615ab8c86SJohn Marino
49715ab8c86SJohn Marino ++coder->threads_initialized;
49815ab8c86SJohn Marino coder->thr = thr;
49915ab8c86SJohn Marino
50015ab8c86SJohn Marino return LZMA_OK;
50115ab8c86SJohn Marino
50215ab8c86SJohn Marino error_thread:
50315ab8c86SJohn Marino mythread_cond_destroy(&thr->cond);
50415ab8c86SJohn Marino
50515ab8c86SJohn Marino error_cond:
50615ab8c86SJohn Marino mythread_mutex_destroy(&thr->mutex);
50715ab8c86SJohn Marino
50815ab8c86SJohn Marino error_mutex:
50915ab8c86SJohn Marino lzma_free(thr->in, allocator);
51015ab8c86SJohn Marino return LZMA_MEM_ERROR;
51115ab8c86SJohn Marino }
51215ab8c86SJohn Marino
51315ab8c86SJohn Marino
51415ab8c86SJohn Marino static lzma_ret
get_thread(lzma_stream_coder * coder,const lzma_allocator * allocator)51546a2189dSzrj get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
51615ab8c86SJohn Marino {
51715ab8c86SJohn Marino // If there are no free output subqueues, there is no
51815ab8c86SJohn Marino // point to try getting a thread.
51915ab8c86SJohn Marino if (!lzma_outq_has_buf(&coder->outq))
52015ab8c86SJohn Marino return LZMA_OK;
52115ab8c86SJohn Marino
52215ab8c86SJohn Marino // If there is a free structure on the stack, use it.
52315ab8c86SJohn Marino mythread_sync(coder->mutex) {
52415ab8c86SJohn Marino if (coder->threads_free != NULL) {
52515ab8c86SJohn Marino coder->thr = coder->threads_free;
52615ab8c86SJohn Marino coder->threads_free = coder->threads_free->next;
52715ab8c86SJohn Marino }
52815ab8c86SJohn Marino }
52915ab8c86SJohn Marino
53015ab8c86SJohn Marino if (coder->thr == NULL) {
53115ab8c86SJohn Marino // If there are no uninitialized structures left, return.
53215ab8c86SJohn Marino if (coder->threads_initialized == coder->threads_max)
53315ab8c86SJohn Marino return LZMA_OK;
53415ab8c86SJohn Marino
53515ab8c86SJohn Marino // Initialize a new thread.
53615ab8c86SJohn Marino return_if_error(initialize_new_thread(coder, allocator));
53715ab8c86SJohn Marino }
53815ab8c86SJohn Marino
53915ab8c86SJohn Marino // Reset the parts of the thread state that have to be done
54015ab8c86SJohn Marino // in the main thread.
54115ab8c86SJohn Marino mythread_sync(coder->thr->mutex) {
54215ab8c86SJohn Marino coder->thr->state = THR_RUN;
54315ab8c86SJohn Marino coder->thr->in_size = 0;
54415ab8c86SJohn Marino coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
54515ab8c86SJohn Marino mythread_cond_signal(&coder->thr->cond);
54615ab8c86SJohn Marino }
54715ab8c86SJohn Marino
54815ab8c86SJohn Marino return LZMA_OK;
54915ab8c86SJohn Marino }
55015ab8c86SJohn Marino
55115ab8c86SJohn Marino
55215ab8c86SJohn Marino static lzma_ret
stream_encode_in(lzma_stream_coder * coder,const lzma_allocator * allocator,const uint8_t * restrict in,size_t * restrict in_pos,size_t in_size,lzma_action action)55346a2189dSzrj stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
55415ab8c86SJohn Marino const uint8_t *restrict in, size_t *restrict in_pos,
55515ab8c86SJohn Marino size_t in_size, lzma_action action)
55615ab8c86SJohn Marino {
55715ab8c86SJohn Marino while (*in_pos < in_size
55815ab8c86SJohn Marino || (coder->thr != NULL && action != LZMA_RUN)) {
55915ab8c86SJohn Marino if (coder->thr == NULL) {
56015ab8c86SJohn Marino // Get a new thread.
56115ab8c86SJohn Marino const lzma_ret ret = get_thread(coder, allocator);
56215ab8c86SJohn Marino if (coder->thr == NULL)
56315ab8c86SJohn Marino return ret;
56415ab8c86SJohn Marino }
56515ab8c86SJohn Marino
56615ab8c86SJohn Marino // Copy the input data to thread's buffer.
56715ab8c86SJohn Marino size_t thr_in_size = coder->thr->in_size;
56815ab8c86SJohn Marino lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
56915ab8c86SJohn Marino &thr_in_size, coder->block_size);
57015ab8c86SJohn Marino
57115ab8c86SJohn Marino // Tell the Block encoder to finish if
57215ab8c86SJohn Marino // - it has got block_size bytes of input; or
57315ab8c86SJohn Marino // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
57415ab8c86SJohn Marino // or LZMA_FULL_BARRIER was used.
57515ab8c86SJohn Marino //
57615ab8c86SJohn Marino // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
57715ab8c86SJohn Marino const bool finish = thr_in_size == coder->block_size
57815ab8c86SJohn Marino || (*in_pos == in_size && action != LZMA_RUN);
57915ab8c86SJohn Marino
58015ab8c86SJohn Marino bool block_error = false;
58115ab8c86SJohn Marino
58215ab8c86SJohn Marino mythread_sync(coder->thr->mutex) {
58315ab8c86SJohn Marino if (coder->thr->state == THR_IDLE) {
58415ab8c86SJohn Marino // Something has gone wrong with the Block
58515ab8c86SJohn Marino // encoder. It has set coder->thread_error
58615ab8c86SJohn Marino // which we will read a few lines later.
58715ab8c86SJohn Marino block_error = true;
58815ab8c86SJohn Marino } else {
58915ab8c86SJohn Marino // Tell the Block encoder its new amount
59015ab8c86SJohn Marino // of input and update the state if needed.
59115ab8c86SJohn Marino coder->thr->in_size = thr_in_size;
59215ab8c86SJohn Marino
59315ab8c86SJohn Marino if (finish)
59415ab8c86SJohn Marino coder->thr->state = THR_FINISH;
59515ab8c86SJohn Marino
59615ab8c86SJohn Marino mythread_cond_signal(&coder->thr->cond);
59715ab8c86SJohn Marino }
59815ab8c86SJohn Marino }
59915ab8c86SJohn Marino
60015ab8c86SJohn Marino if (block_error) {
60115ab8c86SJohn Marino lzma_ret ret;
60215ab8c86SJohn Marino
60315ab8c86SJohn Marino mythread_sync(coder->mutex) {
60415ab8c86SJohn Marino ret = coder->thread_error;
60515ab8c86SJohn Marino }
60615ab8c86SJohn Marino
60715ab8c86SJohn Marino return ret;
60815ab8c86SJohn Marino }
60915ab8c86SJohn Marino
61015ab8c86SJohn Marino if (finish)
61115ab8c86SJohn Marino coder->thr = NULL;
61215ab8c86SJohn Marino }
61315ab8c86SJohn Marino
61415ab8c86SJohn Marino return LZMA_OK;
61515ab8c86SJohn Marino }
61615ab8c86SJohn Marino
61715ab8c86SJohn Marino
61815ab8c86SJohn Marino /// Wait until more input can be consumed, more output can be read, or
61915ab8c86SJohn Marino /// an optional timeout is reached.
62015ab8c86SJohn Marino static bool
wait_for_work(lzma_stream_coder * coder,mythread_condtime * wait_abs,bool * has_blocked,bool has_input)62146a2189dSzrj wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
62215ab8c86SJohn Marino bool *has_blocked, bool has_input)
62315ab8c86SJohn Marino {
62415ab8c86SJohn Marino if (coder->timeout != 0 && !*has_blocked) {
62515ab8c86SJohn Marino // Every time when stream_encode_mt() is called via
62615ab8c86SJohn Marino // lzma_code(), *has_blocked starts as false. We set it
62715ab8c86SJohn Marino // to true here and calculate the absolute time when
62815ab8c86SJohn Marino // we must return if there's nothing to do.
62915ab8c86SJohn Marino //
63015ab8c86SJohn Marino // The idea of *has_blocked is to avoid unneeded calls
63115ab8c86SJohn Marino // to mythread_condtime_set(), which may do a syscall
63215ab8c86SJohn Marino // depending on the operating system.
63315ab8c86SJohn Marino *has_blocked = true;
63415ab8c86SJohn Marino mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
63515ab8c86SJohn Marino }
63615ab8c86SJohn Marino
63715ab8c86SJohn Marino bool timed_out = false;
63815ab8c86SJohn Marino
63915ab8c86SJohn Marino mythread_sync(coder->mutex) {
64015ab8c86SJohn Marino // There are four things that we wait. If one of them
64115ab8c86SJohn Marino // becomes possible, we return.
64215ab8c86SJohn Marino // - If there is input left, we need to get a free
64315ab8c86SJohn Marino // worker thread and an output buffer for it.
64415ab8c86SJohn Marino // - Data ready to be read from the output queue.
64515ab8c86SJohn Marino // - A worker thread indicates an error.
64615ab8c86SJohn Marino // - Time out occurs.
64715ab8c86SJohn Marino while ((!has_input || coder->threads_free == NULL
64815ab8c86SJohn Marino || !lzma_outq_has_buf(&coder->outq))
64915ab8c86SJohn Marino && !lzma_outq_is_readable(&coder->outq)
65015ab8c86SJohn Marino && coder->thread_error == LZMA_OK
65115ab8c86SJohn Marino && !timed_out) {
65215ab8c86SJohn Marino if (coder->timeout != 0)
65315ab8c86SJohn Marino timed_out = mythread_cond_timedwait(
65415ab8c86SJohn Marino &coder->cond, &coder->mutex,
65515ab8c86SJohn Marino wait_abs) != 0;
65615ab8c86SJohn Marino else
65715ab8c86SJohn Marino mythread_cond_wait(&coder->cond,
65815ab8c86SJohn Marino &coder->mutex);
65915ab8c86SJohn Marino }
66015ab8c86SJohn Marino }
66115ab8c86SJohn Marino
66215ab8c86SJohn Marino return timed_out;
66315ab8c86SJohn Marino }
66415ab8c86SJohn Marino
66515ab8c86SJohn Marino
66615ab8c86SJohn Marino static lzma_ret
stream_encode_mt(void * coder_ptr,const lzma_allocator * allocator,const uint8_t * restrict in,size_t * restrict in_pos,size_t in_size,uint8_t * restrict out,size_t * restrict out_pos,size_t out_size,lzma_action action)66746a2189dSzrj stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
66815ab8c86SJohn Marino const uint8_t *restrict in, size_t *restrict in_pos,
66915ab8c86SJohn Marino size_t in_size, uint8_t *restrict out,
67015ab8c86SJohn Marino size_t *restrict out_pos, size_t out_size, lzma_action action)
67115ab8c86SJohn Marino {
67246a2189dSzrj lzma_stream_coder *coder = coder_ptr;
67346a2189dSzrj
67415ab8c86SJohn Marino switch (coder->sequence) {
67515ab8c86SJohn Marino case SEQ_STREAM_HEADER:
67615ab8c86SJohn Marino lzma_bufcpy(coder->header, &coder->header_pos,
67715ab8c86SJohn Marino sizeof(coder->header),
67815ab8c86SJohn Marino out, out_pos, out_size);
67915ab8c86SJohn Marino if (coder->header_pos < sizeof(coder->header))
68015ab8c86SJohn Marino return LZMA_OK;
68115ab8c86SJohn Marino
68215ab8c86SJohn Marino coder->header_pos = 0;
68315ab8c86SJohn Marino coder->sequence = SEQ_BLOCK;
68415ab8c86SJohn Marino
68515ab8c86SJohn Marino // Fall through
68615ab8c86SJohn Marino
68715ab8c86SJohn Marino case SEQ_BLOCK: {
68815ab8c86SJohn Marino // Initialized to silence warnings.
68915ab8c86SJohn Marino lzma_vli unpadded_size = 0;
69015ab8c86SJohn Marino lzma_vli uncompressed_size = 0;
69115ab8c86SJohn Marino lzma_ret ret = LZMA_OK;
69215ab8c86SJohn Marino
69315ab8c86SJohn Marino // These are for wait_for_work().
69415ab8c86SJohn Marino bool has_blocked = false;
69515ab8c86SJohn Marino mythread_condtime wait_abs;
69615ab8c86SJohn Marino
69715ab8c86SJohn Marino while (true) {
69815ab8c86SJohn Marino mythread_sync(coder->mutex) {
69915ab8c86SJohn Marino // Check for Block encoder errors.
70015ab8c86SJohn Marino ret = coder->thread_error;
70115ab8c86SJohn Marino if (ret != LZMA_OK) {
70215ab8c86SJohn Marino assert(ret != LZMA_STREAM_END);
703*e151908bSDaniel Fojt break; // Break out of mythread_sync.
70415ab8c86SJohn Marino }
70515ab8c86SJohn Marino
70615ab8c86SJohn Marino // Try to read compressed data to out[].
70715ab8c86SJohn Marino ret = lzma_outq_read(&coder->outq,
70815ab8c86SJohn Marino out, out_pos, out_size,
70915ab8c86SJohn Marino &unpadded_size,
71015ab8c86SJohn Marino &uncompressed_size);
71115ab8c86SJohn Marino }
71215ab8c86SJohn Marino
71315ab8c86SJohn Marino if (ret == LZMA_STREAM_END) {
71415ab8c86SJohn Marino // End of Block. Add it to the Index.
71515ab8c86SJohn Marino ret = lzma_index_append(coder->index,
71615ab8c86SJohn Marino allocator, unpadded_size,
71715ab8c86SJohn Marino uncompressed_size);
71815ab8c86SJohn Marino
71915ab8c86SJohn Marino // If we didn't fill the output buffer yet,
72015ab8c86SJohn Marino // try to read more data. Maybe the next
72115ab8c86SJohn Marino // outbuf has been finished already too.
72215ab8c86SJohn Marino if (*out_pos < out_size)
72315ab8c86SJohn Marino continue;
72415ab8c86SJohn Marino }
72515ab8c86SJohn Marino
72615ab8c86SJohn Marino if (ret != LZMA_OK) {
72715ab8c86SJohn Marino // coder->thread_error was set or
72815ab8c86SJohn Marino // lzma_index_append() failed.
72915ab8c86SJohn Marino threads_stop(coder, false);
73015ab8c86SJohn Marino return ret;
73115ab8c86SJohn Marino }
73215ab8c86SJohn Marino
73315ab8c86SJohn Marino // Try to give uncompressed data to a worker thread.
73415ab8c86SJohn Marino ret = stream_encode_in(coder, allocator,
73515ab8c86SJohn Marino in, in_pos, in_size, action);
73615ab8c86SJohn Marino if (ret != LZMA_OK) {
73715ab8c86SJohn Marino threads_stop(coder, false);
73815ab8c86SJohn Marino return ret;
73915ab8c86SJohn Marino }
74015ab8c86SJohn Marino
74115ab8c86SJohn Marino // See if we should wait or return.
74215ab8c86SJohn Marino //
74315ab8c86SJohn Marino // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
74415ab8c86SJohn Marino if (*in_pos == in_size) {
74515ab8c86SJohn Marino // LZMA_RUN: More data is probably coming
74615ab8c86SJohn Marino // so return to let the caller fill the
74715ab8c86SJohn Marino // input buffer.
74815ab8c86SJohn Marino if (action == LZMA_RUN)
74915ab8c86SJohn Marino return LZMA_OK;
75015ab8c86SJohn Marino
75115ab8c86SJohn Marino // LZMA_FULL_BARRIER: The same as with
75215ab8c86SJohn Marino // LZMA_RUN but tell the caller that the
75315ab8c86SJohn Marino // barrier was completed.
75415ab8c86SJohn Marino if (action == LZMA_FULL_BARRIER)
75515ab8c86SJohn Marino return LZMA_STREAM_END;
75615ab8c86SJohn Marino
75715ab8c86SJohn Marino // Finishing or flushing isn't completed until
75815ab8c86SJohn Marino // all input data has been encoded and copied
75915ab8c86SJohn Marino // to the output buffer.
76015ab8c86SJohn Marino if (lzma_outq_is_empty(&coder->outq)) {
76115ab8c86SJohn Marino // LZMA_FINISH: Continue to encode
76215ab8c86SJohn Marino // the Index field.
76315ab8c86SJohn Marino if (action == LZMA_FINISH)
76415ab8c86SJohn Marino break;
76515ab8c86SJohn Marino
76615ab8c86SJohn Marino // LZMA_FULL_FLUSH: Return to tell
76715ab8c86SJohn Marino // the caller that flushing was
76815ab8c86SJohn Marino // completed.
76915ab8c86SJohn Marino if (action == LZMA_FULL_FLUSH)
77015ab8c86SJohn Marino return LZMA_STREAM_END;
77115ab8c86SJohn Marino }
77215ab8c86SJohn Marino }
77315ab8c86SJohn Marino
77415ab8c86SJohn Marino // Return if there is no output space left.
77515ab8c86SJohn Marino // This check must be done after testing the input
77615ab8c86SJohn Marino // buffer, because we might want to use a different
77715ab8c86SJohn Marino // return code.
77815ab8c86SJohn Marino if (*out_pos == out_size)
77915ab8c86SJohn Marino return LZMA_OK;
78015ab8c86SJohn Marino
78115ab8c86SJohn Marino // Neither in nor out has been used completely.
78215ab8c86SJohn Marino // Wait until there's something we can do.
78315ab8c86SJohn Marino if (wait_for_work(coder, &wait_abs, &has_blocked,
78415ab8c86SJohn Marino *in_pos < in_size))
78515ab8c86SJohn Marino return LZMA_TIMED_OUT;
78615ab8c86SJohn Marino }
78715ab8c86SJohn Marino
78815ab8c86SJohn Marino // All Blocks have been encoded and the threads have stopped.
78915ab8c86SJohn Marino // Prepare to encode the Index field.
79015ab8c86SJohn Marino return_if_error(lzma_index_encoder_init(
79115ab8c86SJohn Marino &coder->index_encoder, allocator,
79215ab8c86SJohn Marino coder->index));
79315ab8c86SJohn Marino coder->sequence = SEQ_INDEX;
79415ab8c86SJohn Marino
79515ab8c86SJohn Marino // Update the progress info to take the Index and
79615ab8c86SJohn Marino // Stream Footer into account. Those are very fast to encode
79715ab8c86SJohn Marino // so in terms of progress information they can be thought
79815ab8c86SJohn Marino // to be ready to be copied out.
79915ab8c86SJohn Marino coder->progress_out += lzma_index_size(coder->index)
80015ab8c86SJohn Marino + LZMA_STREAM_HEADER_SIZE;
80115ab8c86SJohn Marino }
80215ab8c86SJohn Marino
80315ab8c86SJohn Marino // Fall through
80415ab8c86SJohn Marino
80515ab8c86SJohn Marino case SEQ_INDEX: {
80615ab8c86SJohn Marino // Call the Index encoder. It doesn't take any input, so
80715ab8c86SJohn Marino // those pointers can be NULL.
80815ab8c86SJohn Marino const lzma_ret ret = coder->index_encoder.code(
80915ab8c86SJohn Marino coder->index_encoder.coder, allocator,
81015ab8c86SJohn Marino NULL, NULL, 0,
81115ab8c86SJohn Marino out, out_pos, out_size, LZMA_RUN);
81215ab8c86SJohn Marino if (ret != LZMA_STREAM_END)
81315ab8c86SJohn Marino return ret;
81415ab8c86SJohn Marino
81515ab8c86SJohn Marino // Encode the Stream Footer into coder->buffer.
81615ab8c86SJohn Marino coder->stream_flags.backward_size
81715ab8c86SJohn Marino = lzma_index_size(coder->index);
81815ab8c86SJohn Marino if (lzma_stream_footer_encode(&coder->stream_flags,
81915ab8c86SJohn Marino coder->header) != LZMA_OK)
82015ab8c86SJohn Marino return LZMA_PROG_ERROR;
82115ab8c86SJohn Marino
82215ab8c86SJohn Marino coder->sequence = SEQ_STREAM_FOOTER;
82315ab8c86SJohn Marino }
82415ab8c86SJohn Marino
82515ab8c86SJohn Marino // Fall through
82615ab8c86SJohn Marino
82715ab8c86SJohn Marino case SEQ_STREAM_FOOTER:
82815ab8c86SJohn Marino lzma_bufcpy(coder->header, &coder->header_pos,
82915ab8c86SJohn Marino sizeof(coder->header),
83015ab8c86SJohn Marino out, out_pos, out_size);
83115ab8c86SJohn Marino return coder->header_pos < sizeof(coder->header)
83215ab8c86SJohn Marino ? LZMA_OK : LZMA_STREAM_END;
83315ab8c86SJohn Marino }
83415ab8c86SJohn Marino
83515ab8c86SJohn Marino assert(0);
83615ab8c86SJohn Marino return LZMA_PROG_ERROR;
83715ab8c86SJohn Marino }
83815ab8c86SJohn Marino
83915ab8c86SJohn Marino
84015ab8c86SJohn Marino static void
stream_encoder_mt_end(void * coder_ptr,const lzma_allocator * allocator)84146a2189dSzrj stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
84215ab8c86SJohn Marino {
84346a2189dSzrj lzma_stream_coder *coder = coder_ptr;
84446a2189dSzrj
84515ab8c86SJohn Marino // Threads must be killed before the output queue can be freed.
84615ab8c86SJohn Marino threads_end(coder, allocator);
84715ab8c86SJohn Marino lzma_outq_end(&coder->outq, allocator);
84815ab8c86SJohn Marino
84915ab8c86SJohn Marino for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
85015ab8c86SJohn Marino lzma_free(coder->filters[i].options, allocator);
85115ab8c86SJohn Marino
85215ab8c86SJohn Marino lzma_next_end(&coder->index_encoder, allocator);
85315ab8c86SJohn Marino lzma_index_end(coder->index, allocator);
85415ab8c86SJohn Marino
85515ab8c86SJohn Marino mythread_cond_destroy(&coder->cond);
85615ab8c86SJohn Marino mythread_mutex_destroy(&coder->mutex);
85715ab8c86SJohn Marino
85815ab8c86SJohn Marino lzma_free(coder, allocator);
85915ab8c86SJohn Marino return;
86015ab8c86SJohn Marino }
86115ab8c86SJohn Marino
86215ab8c86SJohn Marino
86315ab8c86SJohn Marino /// Options handling for lzma_stream_encoder_mt_init() and
86415ab8c86SJohn Marino /// lzma_stream_encoder_mt_memusage()
86515ab8c86SJohn Marino static lzma_ret
get_options(const lzma_mt * options,lzma_options_easy * opt_easy,const lzma_filter ** filters,uint64_t * block_size,uint64_t * outbuf_size_max)86615ab8c86SJohn Marino get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
86715ab8c86SJohn Marino const lzma_filter **filters, uint64_t *block_size,
86815ab8c86SJohn Marino uint64_t *outbuf_size_max)
86915ab8c86SJohn Marino {
87015ab8c86SJohn Marino // Validate some of the options.
87115ab8c86SJohn Marino if (options == NULL)
87215ab8c86SJohn Marino return LZMA_PROG_ERROR;
87315ab8c86SJohn Marino
87415ab8c86SJohn Marino if (options->flags != 0 || options->threads == 0
87515ab8c86SJohn Marino || options->threads > LZMA_THREADS_MAX)
87615ab8c86SJohn Marino return LZMA_OPTIONS_ERROR;
87715ab8c86SJohn Marino
87815ab8c86SJohn Marino if (options->filters != NULL) {
87915ab8c86SJohn Marino // Filter chain was given, use it as is.
88015ab8c86SJohn Marino *filters = options->filters;
88115ab8c86SJohn Marino } else {
88215ab8c86SJohn Marino // Use a preset.
88315ab8c86SJohn Marino if (lzma_easy_preset(opt_easy, options->preset))
88415ab8c86SJohn Marino return LZMA_OPTIONS_ERROR;
88515ab8c86SJohn Marino
88615ab8c86SJohn Marino *filters = opt_easy->filters;
88715ab8c86SJohn Marino }
88815ab8c86SJohn Marino
88915ab8c86SJohn Marino // Block size
89015ab8c86SJohn Marino if (options->block_size > 0) {
89115ab8c86SJohn Marino if (options->block_size > BLOCK_SIZE_MAX)
89215ab8c86SJohn Marino return LZMA_OPTIONS_ERROR;
89315ab8c86SJohn Marino
89415ab8c86SJohn Marino *block_size = options->block_size;
89515ab8c86SJohn Marino } else {
89615ab8c86SJohn Marino // Determine the Block size from the filter chain.
89715ab8c86SJohn Marino *block_size = lzma_mt_block_size(*filters);
89815ab8c86SJohn Marino if (*block_size == 0)
89915ab8c86SJohn Marino return LZMA_OPTIONS_ERROR;
90015ab8c86SJohn Marino
90115ab8c86SJohn Marino assert(*block_size <= BLOCK_SIZE_MAX);
90215ab8c86SJohn Marino }
90315ab8c86SJohn Marino
90415ab8c86SJohn Marino // Calculate the maximum amount output that a single output buffer
90515ab8c86SJohn Marino // may need to hold. This is the same as the maximum total size of
90615ab8c86SJohn Marino // a Block.
90715ab8c86SJohn Marino *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
90815ab8c86SJohn Marino if (*outbuf_size_max == 0)
90915ab8c86SJohn Marino return LZMA_MEM_ERROR;
91015ab8c86SJohn Marino
91115ab8c86SJohn Marino return LZMA_OK;
91215ab8c86SJohn Marino }
91315ab8c86SJohn Marino
91415ab8c86SJohn Marino
91515ab8c86SJohn Marino static void
get_progress(void * coder_ptr,uint64_t * progress_in,uint64_t * progress_out)91646a2189dSzrj get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
91715ab8c86SJohn Marino {
91846a2189dSzrj lzma_stream_coder *coder = coder_ptr;
91946a2189dSzrj
92015ab8c86SJohn Marino // Lock coder->mutex to prevent finishing threads from moving their
92146a2189dSzrj // progress info from the worker_thread structure to lzma_stream_coder.
92215ab8c86SJohn Marino mythread_sync(coder->mutex) {
92315ab8c86SJohn Marino *progress_in = coder->progress_in;
92415ab8c86SJohn Marino *progress_out = coder->progress_out;
92515ab8c86SJohn Marino
92615ab8c86SJohn Marino for (size_t i = 0; i < coder->threads_initialized; ++i) {
92715ab8c86SJohn Marino mythread_sync(coder->threads[i].mutex) {
92815ab8c86SJohn Marino *progress_in += coder->threads[i].progress_in;
92915ab8c86SJohn Marino *progress_out += coder->threads[i]
93015ab8c86SJohn Marino .progress_out;
93115ab8c86SJohn Marino }
93215ab8c86SJohn Marino }
93315ab8c86SJohn Marino }
93415ab8c86SJohn Marino
93515ab8c86SJohn Marino return;
93615ab8c86SJohn Marino }
93715ab8c86SJohn Marino
93815ab8c86SJohn Marino
93915ab8c86SJohn Marino static lzma_ret
stream_encoder_mt_init(lzma_next_coder * next,const lzma_allocator * allocator,const lzma_mt * options)94015ab8c86SJohn Marino stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
94115ab8c86SJohn Marino const lzma_mt *options)
94215ab8c86SJohn Marino {
94315ab8c86SJohn Marino lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
94415ab8c86SJohn Marino
94515ab8c86SJohn Marino // Get the filter chain.
94615ab8c86SJohn Marino lzma_options_easy easy;
94715ab8c86SJohn Marino const lzma_filter *filters;
94815ab8c86SJohn Marino uint64_t block_size;
94915ab8c86SJohn Marino uint64_t outbuf_size_max;
95015ab8c86SJohn Marino return_if_error(get_options(options, &easy, &filters,
95115ab8c86SJohn Marino &block_size, &outbuf_size_max));
95215ab8c86SJohn Marino
95315ab8c86SJohn Marino #if SIZE_MAX < UINT64_MAX
95415ab8c86SJohn Marino if (block_size > SIZE_MAX)
95515ab8c86SJohn Marino return LZMA_MEM_ERROR;
95615ab8c86SJohn Marino #endif
95715ab8c86SJohn Marino
95815ab8c86SJohn Marino // Validate the filter chain so that we can give an error in this
95915ab8c86SJohn Marino // function instead of delaying it to the first call to lzma_code().
96015ab8c86SJohn Marino // The memory usage calculation verifies the filter chain as
961*e151908bSDaniel Fojt // a side effect so we take advantage of that.
96215ab8c86SJohn Marino if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
96315ab8c86SJohn Marino return LZMA_OPTIONS_ERROR;
96415ab8c86SJohn Marino
96515ab8c86SJohn Marino // Validate the Check ID.
96615ab8c86SJohn Marino if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
96715ab8c86SJohn Marino return LZMA_PROG_ERROR;
96815ab8c86SJohn Marino
96915ab8c86SJohn Marino if (!lzma_check_is_supported(options->check))
97015ab8c86SJohn Marino return LZMA_UNSUPPORTED_CHECK;
97115ab8c86SJohn Marino
97215ab8c86SJohn Marino // Allocate and initialize the base structure if needed.
97346a2189dSzrj lzma_stream_coder *coder = next->coder;
97446a2189dSzrj if (coder == NULL) {
97546a2189dSzrj coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
97646a2189dSzrj if (coder == NULL)
97715ab8c86SJohn Marino return LZMA_MEM_ERROR;
97815ab8c86SJohn Marino
97946a2189dSzrj next->coder = coder;
98046a2189dSzrj
98115ab8c86SJohn Marino // For the mutex and condition variable initializations
98215ab8c86SJohn Marino // the error handling has to be done here because
98315ab8c86SJohn Marino // stream_encoder_mt_end() doesn't know if they have
98415ab8c86SJohn Marino // already been initialized or not.
98546a2189dSzrj if (mythread_mutex_init(&coder->mutex)) {
98646a2189dSzrj lzma_free(coder, allocator);
98715ab8c86SJohn Marino next->coder = NULL;
98815ab8c86SJohn Marino return LZMA_MEM_ERROR;
98915ab8c86SJohn Marino }
99015ab8c86SJohn Marino
99146a2189dSzrj if (mythread_cond_init(&coder->cond)) {
99246a2189dSzrj mythread_mutex_destroy(&coder->mutex);
99346a2189dSzrj lzma_free(coder, allocator);
99415ab8c86SJohn Marino next->coder = NULL;
99515ab8c86SJohn Marino return LZMA_MEM_ERROR;
99615ab8c86SJohn Marino }
99715ab8c86SJohn Marino
99815ab8c86SJohn Marino next->code = &stream_encode_mt;
99915ab8c86SJohn Marino next->end = &stream_encoder_mt_end;
100015ab8c86SJohn Marino next->get_progress = &get_progress;
100115ab8c86SJohn Marino // next->update = &stream_encoder_mt_update;
100215ab8c86SJohn Marino
100346a2189dSzrj coder->filters[0].id = LZMA_VLI_UNKNOWN;
100446a2189dSzrj coder->index_encoder = LZMA_NEXT_CODER_INIT;
100546a2189dSzrj coder->index = NULL;
100646a2189dSzrj memzero(&coder->outq, sizeof(coder->outq));
100746a2189dSzrj coder->threads = NULL;
100846a2189dSzrj coder->threads_max = 0;
100946a2189dSzrj coder->threads_initialized = 0;
101015ab8c86SJohn Marino }
101115ab8c86SJohn Marino
101215ab8c86SJohn Marino // Basic initializations
101346a2189dSzrj coder->sequence = SEQ_STREAM_HEADER;
101446a2189dSzrj coder->block_size = (size_t)(block_size);
101546a2189dSzrj coder->thread_error = LZMA_OK;
101646a2189dSzrj coder->thr = NULL;
101715ab8c86SJohn Marino
101815ab8c86SJohn Marino // Allocate the thread-specific base structures.
101915ab8c86SJohn Marino assert(options->threads > 0);
102046a2189dSzrj if (coder->threads_max != options->threads) {
102146a2189dSzrj threads_end(coder, allocator);
102215ab8c86SJohn Marino
102346a2189dSzrj coder->threads = NULL;
102446a2189dSzrj coder->threads_max = 0;
102515ab8c86SJohn Marino
102646a2189dSzrj coder->threads_initialized = 0;
102746a2189dSzrj coder->threads_free = NULL;
102815ab8c86SJohn Marino
102946a2189dSzrj coder->threads = lzma_alloc(
103015ab8c86SJohn Marino options->threads * sizeof(worker_thread),
103115ab8c86SJohn Marino allocator);
103246a2189dSzrj if (coder->threads == NULL)
103315ab8c86SJohn Marino return LZMA_MEM_ERROR;
103415ab8c86SJohn Marino
103546a2189dSzrj coder->threads_max = options->threads;
103615ab8c86SJohn Marino } else {
103715ab8c86SJohn Marino // Reuse the old structures and threads. Tell the running
103815ab8c86SJohn Marino // threads to stop and wait until they have stopped.
103946a2189dSzrj threads_stop(coder, true);
104015ab8c86SJohn Marino }
104115ab8c86SJohn Marino
104215ab8c86SJohn Marino // Output queue
104346a2189dSzrj return_if_error(lzma_outq_init(&coder->outq, allocator,
104415ab8c86SJohn Marino outbuf_size_max, options->threads));
104515ab8c86SJohn Marino
104615ab8c86SJohn Marino // Timeout
104746a2189dSzrj coder->timeout = options->timeout;
104815ab8c86SJohn Marino
104915ab8c86SJohn Marino // Free the old filter chain and copy the new one.
105046a2189dSzrj for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
105146a2189dSzrj lzma_free(coder->filters[i].options, allocator);
105215ab8c86SJohn Marino
105315ab8c86SJohn Marino return_if_error(lzma_filters_copy(
105446a2189dSzrj filters, coder->filters, allocator));
105515ab8c86SJohn Marino
105615ab8c86SJohn Marino // Index
105746a2189dSzrj lzma_index_end(coder->index, allocator);
105846a2189dSzrj coder->index = lzma_index_init(allocator);
105946a2189dSzrj if (coder->index == NULL)
106015ab8c86SJohn Marino return LZMA_MEM_ERROR;
106115ab8c86SJohn Marino
106215ab8c86SJohn Marino // Stream Header
106346a2189dSzrj coder->stream_flags.version = 0;
106446a2189dSzrj coder->stream_flags.check = options->check;
106515ab8c86SJohn Marino return_if_error(lzma_stream_header_encode(
106646a2189dSzrj &coder->stream_flags, coder->header));
106715ab8c86SJohn Marino
106846a2189dSzrj coder->header_pos = 0;
106915ab8c86SJohn Marino
107015ab8c86SJohn Marino // Progress info
107146a2189dSzrj coder->progress_in = 0;
107246a2189dSzrj coder->progress_out = LZMA_STREAM_HEADER_SIZE;
107315ab8c86SJohn Marino
107415ab8c86SJohn Marino return LZMA_OK;
107515ab8c86SJohn Marino }
107615ab8c86SJohn Marino
107715ab8c86SJohn Marino
107815ab8c86SJohn Marino extern LZMA_API(lzma_ret)
lzma_stream_encoder_mt(lzma_stream * strm,const lzma_mt * options)107915ab8c86SJohn Marino lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
108015ab8c86SJohn Marino {
108115ab8c86SJohn Marino lzma_next_strm_init(stream_encoder_mt_init, strm, options);
108215ab8c86SJohn Marino
108315ab8c86SJohn Marino strm->internal->supported_actions[LZMA_RUN] = true;
108415ab8c86SJohn Marino // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
108515ab8c86SJohn Marino strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
108615ab8c86SJohn Marino strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
108715ab8c86SJohn Marino strm->internal->supported_actions[LZMA_FINISH] = true;
108815ab8c86SJohn Marino
108915ab8c86SJohn Marino return LZMA_OK;
109015ab8c86SJohn Marino }
109115ab8c86SJohn Marino
109215ab8c86SJohn Marino
109315ab8c86SJohn Marino // This function name is a monster but it's consistent with the older
109415ab8c86SJohn Marino // monster names. :-( 31 chars is the max that C99 requires so in that
109515ab8c86SJohn Marino // sense it's not too long. ;-)
109615ab8c86SJohn Marino extern LZMA_API(uint64_t)
lzma_stream_encoder_mt_memusage(const lzma_mt * options)109715ab8c86SJohn Marino lzma_stream_encoder_mt_memusage(const lzma_mt *options)
109815ab8c86SJohn Marino {
109915ab8c86SJohn Marino lzma_options_easy easy;
110015ab8c86SJohn Marino const lzma_filter *filters;
110115ab8c86SJohn Marino uint64_t block_size;
110215ab8c86SJohn Marino uint64_t outbuf_size_max;
110315ab8c86SJohn Marino
110415ab8c86SJohn Marino if (get_options(options, &easy, &filters, &block_size,
110515ab8c86SJohn Marino &outbuf_size_max) != LZMA_OK)
110615ab8c86SJohn Marino return UINT64_MAX;
110715ab8c86SJohn Marino
110815ab8c86SJohn Marino // Memory usage of the input buffers
110915ab8c86SJohn Marino const uint64_t inbuf_memusage = options->threads * block_size;
111015ab8c86SJohn Marino
111115ab8c86SJohn Marino // Memory usage of the filter encoders
111215ab8c86SJohn Marino uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
111315ab8c86SJohn Marino if (filters_memusage == UINT64_MAX)
111415ab8c86SJohn Marino return UINT64_MAX;
111515ab8c86SJohn Marino
111615ab8c86SJohn Marino filters_memusage *= options->threads;
111715ab8c86SJohn Marino
111815ab8c86SJohn Marino // Memory usage of the output queue
111915ab8c86SJohn Marino const uint64_t outq_memusage = lzma_outq_memusage(
112015ab8c86SJohn Marino outbuf_size_max, options->threads);
112115ab8c86SJohn Marino if (outq_memusage == UINT64_MAX)
112215ab8c86SJohn Marino return UINT64_MAX;
112315ab8c86SJohn Marino
112415ab8c86SJohn Marino // Sum them with overflow checking.
112546a2189dSzrj uint64_t total_memusage = LZMA_MEMUSAGE_BASE
112646a2189dSzrj + sizeof(lzma_stream_coder)
112715ab8c86SJohn Marino + options->threads * sizeof(worker_thread);
112815ab8c86SJohn Marino
112915ab8c86SJohn Marino if (UINT64_MAX - total_memusage < inbuf_memusage)
113015ab8c86SJohn Marino return UINT64_MAX;
113115ab8c86SJohn Marino
113215ab8c86SJohn Marino total_memusage += inbuf_memusage;
113315ab8c86SJohn Marino
113415ab8c86SJohn Marino if (UINT64_MAX - total_memusage < filters_memusage)
113515ab8c86SJohn Marino return UINT64_MAX;
113615ab8c86SJohn Marino
113715ab8c86SJohn Marino total_memusage += filters_memusage;
113815ab8c86SJohn Marino
113915ab8c86SJohn Marino if (UINT64_MAX - total_memusage < outq_memusage)
114015ab8c86SJohn Marino return UINT64_MAX;
114115ab8c86SJohn Marino
114215ab8c86SJohn Marino return total_memusage + outq_memusage;
114315ab8c86SJohn Marino }
1144