1 /*
2 * Copyright (c) 2007 Vreixo Formoso
3 *
4 * This file is part of the libisofs project; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License version 2
6 * or later as published by the Free Software Foundation.
7 * See COPYING file for details.
8 */
9
10 /*
11 * Synchronized ring buffer, works with a writer thread and a read thread.
12 *
13 * TODO #00010 : optimize ring buffer
14 * - write/read at the end of buffer requires a second mutex_lock, even if
15 * there's enough space/data at the beginning
16 * - pre-buffer for writes < BLOCK_SIZE
17 *
18 */
19
20 #ifdef HAVE_CONFIG_H
21 #include "../config.h"
22 #endif
23
24 /*
25 Use the copy of the struct burn_source definition in libisofs.h
26 */
27 #define LIBISOFS_WITHOUT_LIBBURN yes
28 #include "libisofs.h"
29
30 #include "buffer.h"
31 #include "ecma119.h"
32
33 #include <pthread.h>
34 #include <string.h>
35
36 #ifndef MIN
37 # define MIN(a, b) (((a) < (b)) ? (a) : (b))
38 #endif
39
40 struct iso_ring_buffer
41 {
42 uint8_t *buf;
43
44 /*
45 * Max number of bytes in buffer
46 */
47 size_t cap;
48
49 /*
50 * Number of bytes available.
51 */
52 size_t size;
53
54 /* position for reading and writing, offset from buf */
55 size_t rpos;
56 size_t wpos;
57
58 /*
59 * flags to report if read or writer threads ends execution
60 * 0 not finished, 1 finished ok, 2 finish with error
61 */
62 unsigned int rend :2;
63 unsigned int wend :2;
64
65 /* just for statistical purposes */
66 unsigned int times_full;
67 unsigned int times_empty;
68
69 pthread_mutex_t mutex;
70 pthread_cond_t empty;
71 pthread_cond_t full;
72 };
73
74 /**
75 * Create a new buffer.
76 *
77 * The created buffer should be freed with iso_ring_buffer_free()
78 *
79 * @param size
80 * Number of blocks in buffer. You should supply a number >= 32, otherwise
81 * size will be ignored and 32 will be used by default, which leads to a
82 * 64 KiB buffer.
83 * @return
84 * 1 success, < 0 error
85 */
iso_ring_buffer_new(size_t size,IsoRingBuffer ** rbuf)86 int iso_ring_buffer_new(size_t size, IsoRingBuffer **rbuf)
87 {
88 IsoRingBuffer *buffer;
89
90 if (rbuf == NULL) {
91 return ISO_NULL_POINTER;
92 }
93
94 buffer = malloc(sizeof(IsoRingBuffer));
95 if (buffer == NULL) {
96 return ISO_OUT_OF_MEM;
97 }
98
99 buffer->cap = (size > 32 ? size : 32) * BLOCK_SIZE;
100 buffer->buf = malloc(buffer->cap);
101 if (buffer->buf == NULL) {
102 free(buffer);
103 return ISO_OUT_OF_MEM;
104 }
105
106 buffer->size = 0;
107 buffer->wpos = 0;
108 buffer->rpos = 0;
109
110 buffer->times_full = 0;
111 buffer->times_empty = 0;
112
113 buffer->rend = buffer->wend = 0;
114
115 /* init mutex and waiting queues */
116 pthread_mutex_init(&buffer->mutex, NULL);
117 pthread_cond_init(&buffer->empty, NULL);
118 pthread_cond_init(&buffer->full, NULL);
119
120 *rbuf = buffer;
121 return ISO_SUCCESS;
122 }
123
iso_ring_buffer_free(IsoRingBuffer * buf)124 void iso_ring_buffer_free(IsoRingBuffer *buf)
125 {
126 if (buf == NULL) {
127 return;
128 }
129 free(buf->buf);
130 pthread_mutex_destroy(&buf->mutex);
131 pthread_cond_destroy(&buf->empty);
132 pthread_cond_destroy(&buf->full);
133 free(buf);
134 }
135
136 /**
137 * Write count bytes into buffer. It blocks until all bytes where written or
138 * reader close the buffer.
139 *
140 * @param buf
141 * the buffer
142 * @param data
143 * pointer to a memory region of at least coun bytes, from which data
144 * will be read.
145 * @param
146 * Number of bytes to write
147 * @return
148 * 1 success, 0 read finished, < 0 error
149 */
iso_ring_buffer_write(IsoRingBuffer * buf,uint8_t * data,size_t count)150 int iso_ring_buffer_write(IsoRingBuffer *buf, uint8_t *data, size_t count)
151 {
152 size_t len;
153 size_t bytes_write = 0;
154
155 if (buf == NULL || data == NULL) {
156 return ISO_NULL_POINTER;
157 }
158
159 while (bytes_write < count) {
160
161 pthread_mutex_lock(&buf->mutex);
162
163 while (buf->size == buf->cap) {
164
165 /*
166 * Note. There's only a writer, so we have no race conditions.
167 * Thus, the while(buf->size == buf->cap) is used here
168 * only to properly detect the reader has been cancelled
169 */
170
171 if (buf->rend) {
172 /* the read procces has been finished */
173 pthread_mutex_unlock(&buf->mutex);
174 return 0;
175 }
176 buf->times_full++;
177 /* wait until space available */
178 pthread_cond_wait(&buf->full, &buf->mutex);
179 }
180
181 len = MIN(count - bytes_write, buf->cap - buf->size);
182 if (buf->wpos + len > buf->cap) {
183 len = buf->cap - buf->wpos;
184 }
185 memcpy(buf->buf + buf->wpos, data + bytes_write, len);
186 buf->wpos = (buf->wpos + len) % (buf->cap);
187 bytes_write += len;
188 buf->size += len;
189
190 /* wake up reader */
191 pthread_cond_signal(&buf->empty);
192 pthread_mutex_unlock(&buf->mutex);
193 }
194 return ISO_SUCCESS;
195 }
196
197 /**
198 * Read count bytes from the buffer into dest. It blocks until the desired
199 * bytes has been read. If the writer finishes before outputting enough
200 * bytes, 0 (EOF) is returned, the number of bytes already read remains
201 * unknown.
202 *
203 * @return
204 * 1 success, 0 EOF, < 0 error
205 */
iso_ring_buffer_read(IsoRingBuffer * buf,uint8_t * dest,size_t count)206 int iso_ring_buffer_read(IsoRingBuffer *buf, uint8_t *dest, size_t count)
207 {
208 size_t len;
209 size_t bytes_read = 0;
210
211 if (buf == NULL || dest == NULL) {
212 return ISO_NULL_POINTER;
213 }
214
215 while (bytes_read < count) {
216 pthread_mutex_lock(&buf->mutex);
217
218 while (buf->size == 0) {
219 /*
220 * Note. There's only a reader, so we have no race conditions.
221 * Thus, the while(buf->size == 0) is used here just to ensure
222 * a reader detects the EOF properly if the writer has been
223 * canceled while the reader was waiting
224 */
225
226 if (buf->wend) {
227 /* the writer procces has been finished */
228 pthread_mutex_unlock(&buf->mutex);
229 return 0; /* EOF */
230 }
231 buf->times_empty++;
232 /* wait until data available */
233 pthread_cond_wait(&buf->empty, &buf->mutex);
234 }
235
236 len = MIN(count - bytes_read, buf->size);
237 if (buf->rpos + len > buf->cap) {
238 len = buf->cap - buf->rpos;
239 }
240 memcpy(dest + bytes_read, buf->buf + buf->rpos, len);
241 buf->rpos = (buf->rpos + len) % (buf->cap);
242 bytes_read += len;
243 buf->size -= len;
244
245 /* wake up the writer */
246 pthread_cond_signal(&buf->full);
247 pthread_mutex_unlock(&buf->mutex);
248 }
249 return ISO_SUCCESS;
250 }
251
iso_ring_buffer_writer_close(IsoRingBuffer * buf,int error)252 void iso_ring_buffer_writer_close(IsoRingBuffer *buf, int error)
253 {
254 pthread_mutex_lock(&buf->mutex);
255 buf->wend = error ? 2 : 1;
256
257 /* ensure no reader is waiting */
258 pthread_cond_signal(&buf->empty);
259 pthread_mutex_unlock(&buf->mutex);
260 }
261
iso_ring_buffer_reader_close(IsoRingBuffer * buf,int error)262 void iso_ring_buffer_reader_close(IsoRingBuffer *buf, int error)
263 {
264 pthread_mutex_lock(&buf->mutex);
265
266 if (buf->rend) {
267 /* reader already closed */
268 pthread_mutex_unlock(&buf->mutex);
269 return;
270 }
271
272 buf->rend = error ? 2 : 1;
273
274 /* ensure no writer is waiting */
275 pthread_cond_signal(&buf->full);
276 pthread_mutex_unlock(&buf->mutex);
277 }
278
279 /**
280 * Get the times the buffer was full.
281 */
iso_ring_buffer_get_times_full(IsoRingBuffer * buf)282 unsigned int iso_ring_buffer_get_times_full(IsoRingBuffer *buf)
283 {
284 return buf->times_full;
285 }
286
287 /**
288 * Get the times the buffer was empty.
289 */
iso_ring_buffer_get_times_empty(IsoRingBuffer * buf)290 unsigned int iso_ring_buffer_get_times_empty(IsoRingBuffer *buf)
291 {
292 return buf->times_empty;
293 }
294
295
296 /** Internal via buffer.h
297 *
298 * Get the status of a ring buffer.
299 *
300 * @param buf
301 * The ring buffer object to inquire
302 * @param size
303 * Will be filled with the total size of the buffer, in bytes
304 * @param free_bytes
305 * Will be filled with the bytes currently available in buffer
306 * @return
307 * < 0 error, > 0 state:
308 * 1="active" : input and consumption are active
309 * 2="ending" : input has ended without error
310 * 3="failing" : input had error and ended,
311 * 5="abandoned" : consumption has ended prematurely
312 * 6="ended" : consumption has ended without input error
313 * 7="aborted" : consumption has ended after input error
314 */
iso_ring_buffer_get_buf_status(IsoRingBuffer * buf,size_t * size,size_t * free_bytes)315 int iso_ring_buffer_get_buf_status(IsoRingBuffer *buf, size_t *size,
316 size_t *free_bytes)
317 {
318 int ret;
319
320 if (buf == NULL) {
321 return ISO_NULL_POINTER;
322 }
323
324 /* get mutex */
325 pthread_mutex_lock(&buf->mutex);
326 if (size) {
327 *size = buf->cap;
328 }
329 if (free_bytes) {
330 *free_bytes = buf->cap - buf->size;
331 }
332
333 ret = (buf->rend ? 4 : 0) + (buf->wend + 1);
334
335 pthread_mutex_unlock(&buf->mutex);
336 return ret;
337 }
338
339 /** API via libisofs.h
340 *
341 * Get the status of the buffer used by a burn_source.
342 *
343 * @param b
344 * A burn_source previously obtained with
345 * iso_image_create_burn_source().
346 * @param size
347 * Will be filled with the total size of the buffer, in bytes
348 * @param free_bytes
349 * Will be filled with the bytes currently available in buffer
350 * @return
351 * < 0 error, > 0 state:
352 * 1="active" : input and consumption are active
353 * 2="ending" : input has ended without error
354 * 3="failing" : input had error and ended,
355 * 5="abandoned" : consumption has ended prematurely
356 * 6="ended" : consumption has ended without input error
357 * 7="aborted" : consumption has ended after input error
358 */
iso_ring_buffer_get_status(struct burn_source * b,size_t * size,size_t * free_bytes)359 int iso_ring_buffer_get_status(struct burn_source *b, size_t *size,
360 size_t *free_bytes)
361 {
362 int ret;
363 IsoRingBuffer *buf;
364 if (b == NULL) {
365 return ISO_NULL_POINTER;
366 }
367 buf = ((Ecma119Image*)(b->data))->buffer;
368 ret = iso_ring_buffer_get_buf_status(buf, size, free_bytes);
369 return ret;
370 }
371
372