1 /*
2  * Copyright (c) 2007 Vreixo Formoso
3  *
4  * This file is part of the libisofs project; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License version 2
6  * or later as published by the Free Software Foundation.
7  * See COPYING file for details.
8  */
9 
10 /*
11  * Synchronized ring buffer, works with a writer thread and a read thread.
12  *
13  * TODO #00010 : optimize ring buffer
14  *  - write/read at the end of buffer requires a second mutex_lock, even if
15  *    there's enough space/data at the beginning
16  *  - pre-buffer for writes < BLOCK_SIZE
17  *
18  */
19 
20 #ifdef HAVE_CONFIG_H
21 #include "../config.h"
22 #endif
23 
24 /*
25    Use the copy of the struct burn_source definition in libisofs.h
26 */
27 #define LIBISOFS_WITHOUT_LIBBURN yes
28 #include "libisofs.h"
29 
30 #include "buffer.h"
31 #include "ecma119.h"
32 
33 #include <pthread.h>
34 #include <string.h>
35 
36 #ifndef MIN
37 #   define MIN(a, b) (((a) < (b)) ? (a) : (b))
38 #endif
39 
40 struct iso_ring_buffer
41 {
42     uint8_t *buf;
43 
44     /*
45      * Max number of bytes in buffer
46      */
47     size_t cap;
48 
49     /*
50      * Number of bytes available.
51      */
52     size_t size;
53 
54     /* position for reading and writing, offset from buf */
55     size_t rpos;
56     size_t wpos;
57 
58     /*
59      * flags to report if read or writer threads ends execution
60      * 0 not finished, 1 finished ok, 2 finish with error
61      */
62     unsigned int rend :2;
63     unsigned int wend :2;
64 
65     /* just for statistical purposes */
66     unsigned int times_full;
67     unsigned int times_empty;
68 
69     pthread_mutex_t mutex;
70     pthread_cond_t empty;
71     pthread_cond_t full;
72 };
73 
74 /**
75  * Create a new buffer.
76  *
77  * The created buffer should be freed with iso_ring_buffer_free()
78  *
79  * @param size
80  *     Number of blocks in buffer. You should supply a number >= 32, otherwise
81  *     size will be ignored and 32 will be used by default, which leads to a
82  *     64 KiB buffer.
83  * @return
84  *     1 success, < 0 error
85  */
iso_ring_buffer_new(size_t size,IsoRingBuffer ** rbuf)86 int iso_ring_buffer_new(size_t size, IsoRingBuffer **rbuf)
87 {
88     IsoRingBuffer *buffer;
89 
90     if (rbuf == NULL) {
91         return ISO_NULL_POINTER;
92     }
93 
94     buffer = malloc(sizeof(IsoRingBuffer));
95     if (buffer == NULL) {
96         return ISO_OUT_OF_MEM;
97     }
98 
99     buffer->cap = (size > 32 ? size : 32) * BLOCK_SIZE;
100     buffer->buf = malloc(buffer->cap);
101     if (buffer->buf == NULL) {
102         free(buffer);
103         return ISO_OUT_OF_MEM;
104     }
105 
106     buffer->size = 0;
107     buffer->wpos = 0;
108     buffer->rpos = 0;
109 
110     buffer->times_full = 0;
111     buffer->times_empty = 0;
112 
113     buffer->rend = buffer->wend = 0;
114 
115     /* init mutex and waiting queues */
116     pthread_mutex_init(&buffer->mutex, NULL);
117     pthread_cond_init(&buffer->empty, NULL);
118     pthread_cond_init(&buffer->full, NULL);
119 
120     *rbuf = buffer;
121     return ISO_SUCCESS;
122 }
123 
iso_ring_buffer_free(IsoRingBuffer * buf)124 void iso_ring_buffer_free(IsoRingBuffer *buf)
125 {
126     if (buf == NULL) {
127         return;
128     }
129     free(buf->buf);
130     pthread_mutex_destroy(&buf->mutex);
131     pthread_cond_destroy(&buf->empty);
132     pthread_cond_destroy(&buf->full);
133     free(buf);
134 }
135 
136 /**
137  * Write count bytes into buffer. It blocks until all bytes where written or
138  * reader close the buffer.
139  *
140  * @param buf
141  *      the buffer
142  * @param data
143  *      pointer to a memory region of at least coun bytes, from which data
144  *      will be read.
145  * @param
146  *      Number of bytes to write
147  * @return
148  *      1 success, 0 read finished, < 0 error
149  */
iso_ring_buffer_write(IsoRingBuffer * buf,uint8_t * data,size_t count)150 int iso_ring_buffer_write(IsoRingBuffer *buf, uint8_t *data, size_t count)
151 {
152     size_t len;
153     size_t bytes_write = 0;
154 
155     if (buf == NULL || data == NULL) {
156         return ISO_NULL_POINTER;
157     }
158 
159     while (bytes_write < count) {
160 
161         pthread_mutex_lock(&buf->mutex);
162 
163         while (buf->size == buf->cap) {
164 
165             /*
166              * Note. There's only a writer, so we have no race conditions.
167              * Thus, the while(buf->size == buf->cap) is used here
168              * only to properly detect the reader has been cancelled
169              */
170 
171             if (buf->rend) {
172                 /* the read procces has been finished */
173                 pthread_mutex_unlock(&buf->mutex);
174                 return 0;
175             }
176             buf->times_full++;
177             /* wait until space available */
178             pthread_cond_wait(&buf->full, &buf->mutex);
179         }
180 
181         len = MIN(count - bytes_write, buf->cap - buf->size);
182         if (buf->wpos + len > buf->cap) {
183             len = buf->cap - buf->wpos;
184         }
185         memcpy(buf->buf + buf->wpos, data + bytes_write, len);
186         buf->wpos = (buf->wpos + len) % (buf->cap);
187         bytes_write += len;
188         buf->size += len;
189 
190         /* wake up reader */
191         pthread_cond_signal(&buf->empty);
192         pthread_mutex_unlock(&buf->mutex);
193     }
194     return ISO_SUCCESS;
195 }
196 
197 /**
198  * Read count bytes from the buffer into dest. It blocks until the desired
199  * bytes has been read. If the writer finishes before outputting enough
200  * bytes, 0 (EOF) is returned, the number of bytes already read remains
201  * unknown.
202  *
203  * @return
204  *      1 success, 0 EOF, < 0 error
205  */
iso_ring_buffer_read(IsoRingBuffer * buf,uint8_t * dest,size_t count)206 int iso_ring_buffer_read(IsoRingBuffer *buf, uint8_t *dest, size_t count)
207 {
208     size_t len;
209     size_t bytes_read = 0;
210 
211     if (buf == NULL || dest == NULL) {
212         return ISO_NULL_POINTER;
213     }
214 
215     while (bytes_read < count) {
216         pthread_mutex_lock(&buf->mutex);
217 
218         while (buf->size == 0) {
219             /*
220              * Note. There's only a reader, so we have no race conditions.
221              * Thus, the while(buf->size == 0) is used here just to ensure
222              * a reader detects the EOF properly if the writer has been
223              * canceled while the reader was waiting
224              */
225 
226             if (buf->wend) {
227                 /* the writer procces has been finished */
228                 pthread_mutex_unlock(&buf->mutex);
229                 return 0; /* EOF */
230             }
231             buf->times_empty++;
232             /* wait until data available */
233             pthread_cond_wait(&buf->empty, &buf->mutex);
234         }
235 
236         len = MIN(count - bytes_read, buf->size);
237         if (buf->rpos + len > buf->cap) {
238             len = buf->cap - buf->rpos;
239         }
240         memcpy(dest + bytes_read, buf->buf + buf->rpos, len);
241         buf->rpos = (buf->rpos + len) % (buf->cap);
242         bytes_read += len;
243         buf->size -= len;
244 
245         /* wake up the writer */
246         pthread_cond_signal(&buf->full);
247         pthread_mutex_unlock(&buf->mutex);
248     }
249     return ISO_SUCCESS;
250 }
251 
iso_ring_buffer_writer_close(IsoRingBuffer * buf,int error)252 void iso_ring_buffer_writer_close(IsoRingBuffer *buf, int error)
253 {
254     pthread_mutex_lock(&buf->mutex);
255     buf->wend = error ? 2 : 1;
256 
257     /* ensure no reader is waiting */
258     pthread_cond_signal(&buf->empty);
259     pthread_mutex_unlock(&buf->mutex);
260 }
261 
iso_ring_buffer_reader_close(IsoRingBuffer * buf,int error)262 void iso_ring_buffer_reader_close(IsoRingBuffer *buf, int error)
263 {
264     pthread_mutex_lock(&buf->mutex);
265 
266     if (buf->rend) {
267         /* reader already closed */
268         pthread_mutex_unlock(&buf->mutex);
269         return;
270     }
271 
272     buf->rend = error ? 2 : 1;
273 
274     /* ensure no writer is waiting */
275     pthread_cond_signal(&buf->full);
276     pthread_mutex_unlock(&buf->mutex);
277 }
278 
279 /**
280  * Get the times the buffer was full.
281  */
iso_ring_buffer_get_times_full(IsoRingBuffer * buf)282 unsigned int iso_ring_buffer_get_times_full(IsoRingBuffer *buf)
283 {
284     return buf->times_full;
285 }
286 
287 /**
288  * Get the times the buffer was empty.
289  */
iso_ring_buffer_get_times_empty(IsoRingBuffer * buf)290 unsigned int iso_ring_buffer_get_times_empty(IsoRingBuffer *buf)
291 {
292     return buf->times_empty;
293 }
294 
295 
296 /** Internal via buffer.h
297  *
298  * Get the status of a ring buffer.
299  *
300  * @param buf
301  *      The ring buffer object to inquire
302  * @param size
303  *      Will be filled with the total size of the buffer, in bytes
304  * @param free_bytes
305  *      Will be filled with the bytes currently available in buffer
306  * @return
307  *      < 0 error, > 0 state:
308  *           1="active"    : input and consumption are active
309  *           2="ending"    : input has ended without error
310  *           3="failing"   : input had error and ended,
311  *           5="abandoned" : consumption has ended prematurely
312  *           6="ended"     : consumption has ended without input error
313  *           7="aborted"   : consumption has ended after input error
314  */
iso_ring_buffer_get_buf_status(IsoRingBuffer * buf,size_t * size,size_t * free_bytes)315 int iso_ring_buffer_get_buf_status(IsoRingBuffer *buf, size_t *size,
316                                    size_t *free_bytes)
317 {
318     int ret;
319 
320     if (buf == NULL) {
321         return ISO_NULL_POINTER;
322     }
323 
324     /* get mutex */
325     pthread_mutex_lock(&buf->mutex);
326     if (size) {
327         *size = buf->cap;
328     }
329     if (free_bytes) {
330         *free_bytes = buf->cap - buf->size;
331     }
332 
333     ret = (buf->rend ? 4 : 0) + (buf->wend + 1);
334 
335     pthread_mutex_unlock(&buf->mutex);
336     return ret;
337 }
338 
339 /** API via libisofs.h
340  *
341  * Get the status of the buffer used by a burn_source.
342  *
343  * @param b
344  *      A burn_source previously obtained with
345  *      iso_image_create_burn_source().
346  * @param size
347  *      Will be filled with the total size of the buffer, in bytes
348  * @param free_bytes
349  *      Will be filled with the bytes currently available in buffer
350  * @return
351  *      < 0 error, > 0 state:
352  *           1="active"    : input and consumption are active
353  *           2="ending"    : input has ended without error
354  *           3="failing"   : input had error and ended,
355  *           5="abandoned" : consumption has ended prematurely
356  *           6="ended"     : consumption has ended without input error
357  *           7="aborted"   : consumption has ended after input error
358  */
iso_ring_buffer_get_status(struct burn_source * b,size_t * size,size_t * free_bytes)359 int iso_ring_buffer_get_status(struct burn_source *b, size_t *size,
360                                size_t *free_bytes)
361 {
362     int ret;
363     IsoRingBuffer *buf;
364     if (b == NULL) {
365         return ISO_NULL_POINTER;
366     }
367     buf = ((Ecma119Image*)(b->data))->buffer;
368     ret = iso_ring_buffer_get_buf_status(buf, size, free_bytes);
369     return ret;
370 }
371 
372