xref: /freebsd/contrib/xz/src/liblzma/common/outqueue.c (revision 61e21613)
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 /// \file       outqueue.c
4 /// \brief      Output queue handling in multithreaded coding
5 //
6 //  Author:     Lasse Collin
7 //
8 //  This file has been put into the public domain.
9 //  You can do whatever you want with this file.
10 //
11 ///////////////////////////////////////////////////////////////////////////////
12 
13 #include "outqueue.h"
14 
15 
16 /// Get the maximum number of buffers that may be allocated based
17 /// on the number of threads. For now this is twice the number of threads.
18 /// It's a compromise between RAM usage and keeping the worker threads busy
19 /// when buffers finish out of order.
20 #define GET_BUFS_LIMIT(threads) (2 * (threads))
21 
22 
23 extern uint64_t
24 lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
25 {
26 	// This is to ease integer overflow checking: We may allocate up to
27 	// GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra
28 	// memory for other data structures too (that's the /2).
29 	//
30 	// lzma_outq_prealloc_buf() will still accept bigger buffers than this.
31 	const uint64_t limit
32 			= UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2;
33 
34 	if (threads > LZMA_THREADS_MAX || buf_size_max > limit)
35 		return UINT64_MAX;
36 
37 	return GET_BUFS_LIMIT(threads)
38 			* lzma_outq_outbuf_memusage(buf_size_max);
39 }
40 
41 
42 static void
43 move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator)
44 {
45 	assert(outq->head != NULL);
46 	assert(outq->tail != NULL);
47 	assert(outq->bufs_in_use > 0);
48 
49 	lzma_outbuf *buf = outq->head;
50 	outq->head = buf->next;
51 	if (outq->head == NULL)
52 		outq->tail = NULL;
53 
54 	if (outq->cache != NULL && outq->cache->allocated != buf->allocated)
55 		lzma_outq_clear_cache(outq, allocator);
56 
57 	buf->next = outq->cache;
58 	outq->cache = buf;
59 
60 	--outq->bufs_in_use;
61 	outq->mem_in_use -= lzma_outq_outbuf_memusage(buf->allocated);
62 
63 	return;
64 }
65 
66 
67 static void
68 free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator)
69 {
70 	assert(outq->cache != NULL);
71 
72 	lzma_outbuf *buf = outq->cache;
73 	outq->cache = buf->next;
74 
75 	--outq->bufs_allocated;
76 	outq->mem_allocated -= lzma_outq_outbuf_memusage(buf->allocated);
77 
78 	lzma_free(buf, allocator);
79 	return;
80 }
81 
82 
83 extern void
84 lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator)
85 {
86 	while (outq->cache != NULL)
87 		free_one_cached_buffer(outq, allocator);
88 
89 	return;
90 }
91 
92 
93 extern void
94 lzma_outq_clear_cache2(lzma_outq *outq, const lzma_allocator *allocator,
95 		size_t keep_size)
96 {
97 	if (outq->cache == NULL)
98 		return;
99 
100 	// Free all but one.
101 	while (outq->cache->next != NULL)
102 		free_one_cached_buffer(outq, allocator);
103 
104 	// Free the last one only if its size doesn't equal to keep_size.
105 	if (outq->cache->allocated != keep_size)
106 		free_one_cached_buffer(outq, allocator);
107 
108 	return;
109 }
110 
111 
112 extern lzma_ret
113 lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator,
114 		uint32_t threads)
115 {
116 	if (threads > LZMA_THREADS_MAX)
117 		return LZMA_OPTIONS_ERROR;
118 
119 	const uint32_t bufs_limit = GET_BUFS_LIMIT(threads);
120 
121 	// Clear head/tail.
122 	while (outq->head != NULL)
123 		move_head_to_cache(outq, allocator);
124 
125 	// If new buf_limit is lower than the old one, we may need to free
126 	// a few cached buffers.
127 	while (bufs_limit < outq->bufs_allocated)
128 		free_one_cached_buffer(outq, allocator);
129 
130 	outq->bufs_limit = bufs_limit;
131 	outq->read_pos = 0;
132 
133 	return LZMA_OK;
134 }
135 
136 
137 extern void
138 lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator)
139 {
140 	while (outq->head != NULL)
141 		move_head_to_cache(outq, allocator);
142 
143 	lzma_outq_clear_cache(outq, allocator);
144 	return;
145 }
146 
147 
148 extern lzma_ret
149 lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator,
150 		size_t size)
151 {
152 	// Caller must have checked it with lzma_outq_has_buf().
153 	assert(outq->bufs_in_use < outq->bufs_limit);
154 
155 	// If there already is appropriately-sized buffer in the cache,
156 	// we need to do nothing.
157 	if (outq->cache != NULL && outq->cache->allocated == size)
158 		return LZMA_OK;
159 
160 	if (size > SIZE_MAX - sizeof(lzma_outbuf))
161 		return LZMA_MEM_ERROR;
162 
163 	const size_t alloc_size = lzma_outq_outbuf_memusage(size);
164 
165 	// The cache may have buffers but their size is wrong.
166 	lzma_outq_clear_cache(outq, allocator);
167 
168 	outq->cache = lzma_alloc(alloc_size, allocator);
169 	if (outq->cache == NULL)
170 		return LZMA_MEM_ERROR;
171 
172 	outq->cache->next = NULL;
173 	outq->cache->allocated = size;
174 
175 	++outq->bufs_allocated;
176 	outq->mem_allocated += alloc_size;
177 
178 	return LZMA_OK;
179 }
180 
181 
182 extern lzma_outbuf *
183 lzma_outq_get_buf(lzma_outq *outq, void *worker)
184 {
185 	// Caller must have used lzma_outq_prealloc_buf() to ensure these.
186 	assert(outq->bufs_in_use < outq->bufs_limit);
187 	assert(outq->bufs_in_use < outq->bufs_allocated);
188 	assert(outq->cache != NULL);
189 
190 	lzma_outbuf *buf = outq->cache;
191 	outq->cache = buf->next;
192 	buf->next = NULL;
193 
194 	if (outq->tail != NULL) {
195 		assert(outq->head != NULL);
196 		outq->tail->next = buf;
197 	} else {
198 		assert(outq->head == NULL);
199 		outq->head = buf;
200 	}
201 
202 	outq->tail = buf;
203 
204 	buf->worker = worker;
205 	buf->finished = false;
206 	buf->finish_ret = LZMA_STREAM_END;
207 	buf->pos = 0;
208 	buf->decoder_in_pos = 0;
209 
210 	buf->unpadded_size = 0;
211 	buf->uncompressed_size = 0;
212 
213 	++outq->bufs_in_use;
214 	outq->mem_in_use += lzma_outq_outbuf_memusage(buf->allocated);
215 
216 	return buf;
217 }
218 
219 
220 extern bool
221 lzma_outq_is_readable(const lzma_outq *outq)
222 {
223 	if (outq->head == NULL)
224 		return false;
225 
226 	return outq->read_pos < outq->head->pos || outq->head->finished;
227 }
228 
229 
230 extern lzma_ret
231 lzma_outq_read(lzma_outq *restrict outq,
232 		const lzma_allocator *restrict allocator,
233 		uint8_t *restrict out, size_t *restrict out_pos,
234 		size_t out_size,
235 		lzma_vli *restrict unpadded_size,
236 		lzma_vli *restrict uncompressed_size)
237 {
238 	// There must be at least one buffer from which to read.
239 	if (outq->bufs_in_use == 0)
240 		return LZMA_OK;
241 
242 	// Get the buffer.
243 	lzma_outbuf *buf = outq->head;
244 
245 	// Copy from the buffer to output.
246 	//
247 	// FIXME? In threaded decoder it may be bad to do this copy while
248 	// the mutex is being held.
249 	lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos,
250 			out, out_pos, out_size);
251 
252 	// Return if we didn't get all the data from the buffer.
253 	if (!buf->finished || outq->read_pos < buf->pos)
254 		return LZMA_OK;
255 
256 	// The buffer was finished. Tell the caller its size information.
257 	if (unpadded_size != NULL)
258 		*unpadded_size = buf->unpadded_size;
259 
260 	if (uncompressed_size != NULL)
261 		*uncompressed_size = buf->uncompressed_size;
262 
263 	// Remember the return value.
264 	const lzma_ret finish_ret = buf->finish_ret;
265 
266 	// Free this buffer for further use.
267 	move_head_to_cache(outq, allocator);
268 	outq->read_pos = 0;
269 
270 	return finish_ret;
271 }
272 
273 
274 extern void
275 lzma_outq_enable_partial_output(lzma_outq *outq,
276 		void (*enable_partial_output)(void *worker))
277 {
278 	if (outq->head != NULL && !outq->head->finished
279 			&& outq->head->worker != NULL) {
280 		enable_partial_output(outq->head->worker);
281 
282 		// Set it to NULL since calling it twice is pointless.
283 		outq->head->worker = NULL;
284 	}
285 
286 	return;
287 }
288