1 /*
2 ** Copyright (C) 2004-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8 
9 #include <silk/silk.h>
10 
11 RCSIDENT("$SiLK: circbuf.c ef14e54179be 2020-04-14 21:57:45Z mthomas $");
12 
13 #include <silk/sklog.h>
14 #include "circbuf.h"
15 
16 #ifdef CIRCBUF_TRACE_LEVEL
17 #define TRACEMSG_LEVEL 1
18 #endif
19 #define TRACEMSG( msg) TRACEMSG_TO_TRACEMSGLVL(1, msg)
20 #include <silk/sktracemsg.h>
21 
22 /* Minimum number of items which should be storable in a chunk */
23 #define SK_CIRCBUF_MINIMUM_ITEMS_PER_CHUNK 3
24 
25 /* Maximum possible size of a single item */
26 #define SK_CIRCBUF_CHUNK_MAXIMUM_ITEM_SIZE                 \
27     ((1 << 28) / SK_CIRCBUF_MINIMUM_ITEMS_PER_CHUNK)
28 
29 
30 /*
31  *    The sk_circbuf_t hands cells to the writing thread which that
32  *    thread fills.  The sk_circbuf_t holds onto these cells until the
33  *    reading thread requests them.  The maxinum number of cells a
34  *    sk_circbuf_t may allocate is specified at creatation time.
35  *    However, the cells are not allocated as one block of memory.
36  *    Instead, the sk_circbuf_t allocates smaller blocks of memory
37  *    called chunks.  All chunks are the same size.  To summarize, the
38  *    sk_circbuf_t is composed of multiple chunks, and a chunk is
39  *    composed of multiple cells.
40  *
41  *    For each chunk, the 'writer' member points to the cell currently
42  *    in use by the writing thread, and the 'reader' member points to
43  *    the cell currently in use by the reading thread.
44  *
45  *    All cells "between" the 'reader' and the 'writer' have data.  In
46  *    the diagram below, the 'writer' has wrapped around, and all
47  *    cells with 'D' have data.  'W' is where the writing thread is
48  *    currently writing data, and 'R' is where the reading thread is
49  *    reading.
50  *
51  *        _ _ _ _ _ _ _ _ _ _ _ _
52  *       |D|D|W|_|_|_|_|_|R|D|D|D|
53  *            A A         A A
54  *            | |         | |
55  *            | next_wtr  | next_rdr
56  *            |           |
57  *            writer      reader
58  *
59  *    When the writing thread or reading thread finishes with a cell,
60  *    it calls the appropriate "get next" function which releases the
61  *    current cell and moves the thread to the next cell.
62  *
63  *    If a chunk becomes full and the number of cells is not at the
64  *    maximum, a new chunk is allocated and the writer starts using
65  *    cells from the new chunk.  Depending on the chunk size and
66  *    maximum number of cells allowed, there may be multiple chunks in
67  *    the chunk list between the writer and the reader.
68  *
69  *    Once the reading thread finishes with all the cells in the
70  *    current chunk, the reader moves to the first cell of the next
71  *    chunk in the chunk list, and the chunk the reader just completed
72  *    is discarded.  The sk_circbuf_t is circular within a chunk, but
73  *    like a linked list between multiple chunks.
74  *
75  *    The first time the sk_circbuf_t has a chunk to discard, the
76  *    sk_circbuf_t stores the chunk as spare (instead of deallocating
77  *    the chunk).  When a chunk needs to be discard and the
78  *    sk_circbuf_t already has a spare chunk, the chunk is
79  *    deallocated.
80  *
81  */
82 typedef struct circbuf_chunk_st circbuf_chunk_t;
83 struct circbuf_chunk_st {
84     /* Next chunk in chunk list */
85     circbuf_chunk_t *next;
86     /* Next writer cell index */
87     uint32_t         next_writer;
88     /* Current writer cell index */
89     uint32_t         writer;
90     /* Next reader cell index */
91     uint32_t         next_reader;
92     /* Current reader cell index */
93     uint32_t         reader;
94     /* Buffer containing cells */
95     uint8_t         *data;
96     /* True if all cells are used */
97     unsigned         full   :1;
98 };
99 
100 
101 /* sk_circbuf_t */
102 struct sk_circbuf_st {
103     /* Maximum number of cells */
104     uint32_t         maxcells;
105     /* Current number of cells in use, across all chunks */
106     uint32_t         cellcount;
107     /* Size of a single cell */
108     uint32_t         cellsize;
109     /* Number of cells per chunk */
110     uint32_t         cells_per_chunk;
111     /* Writer chunk */
112     circbuf_chunk_t *writer_chunk;
113     /* Rreader chunk */
114     circbuf_chunk_t *reader_chunk;
115     /* Spare chunk */
116     circbuf_chunk_t *spare_chunk;
117     /* Mutex */
118     pthread_mutex_t  mutex;
119     /* Condition variable */
120     pthread_cond_t   cond;
121     /* Number of threads waiting on this buf */
122     uint32_t         wait_count;
123     /* True if the buf has been stopped */
124     unsigned         destroyed : 1;
125 };
126 /* typedef struct sk_circbuf_st sk_circbuf_t; */
127 
128 
129 /* Allocate a new chunk */
130 static circbuf_chunk_t *
circbuf_alloc_chunk(sk_circbuf_t * buf)131 circbuf_alloc_chunk(
132     sk_circbuf_t       *buf)
133 {
134     circbuf_chunk_t *chunk;
135 
136     if (buf->spare_chunk) {
137         /* If there is a spare chunk, use it.  We maintain a spare
138          * chunk to avoid reallocating frequently when items are
139          * removed more quickly then they are added. */
140         chunk = buf->spare_chunk;
141         buf->spare_chunk = NULL;
142         chunk->next_writer = chunk->reader = 0;
143     } else {
144         /* Otherwise, allocate a new chunk. */
145         chunk = (circbuf_chunk_t*)calloc(1, sizeof(circbuf_chunk_t));
146         if (chunk == NULL) {
147             return NULL;
148         }
149         chunk->data = (uint8_t*)malloc(buf->cells_per_chunk * buf->cellsize);
150         if (chunk->data == NULL) {
151             free(chunk);
152             return NULL;
153         }
154     }
155     chunk->writer = buf->cells_per_chunk - 1;
156     chunk->next_reader = 1;
157     chunk->next = NULL;
158 
159     return chunk;
160 }
161 
162 
163 int
skCircBufCreate(sk_circbuf_t ** buf_out,uint32_t item_size,uint32_t item_count)164 skCircBufCreate(
165     sk_circbuf_t      **buf_out,
166     uint32_t            item_size,
167     uint32_t            item_count)
168 {
169     sk_circbuf_t *buf;
170     uint32_t chunks;
171 
172     if (NULL == buf_out) {
173         return SK_CIRCBUF_E_BAD_PARAM;
174     }
175     *buf_out = NULL;
176 
177     if (item_count == 0
178         || item_size == 0
179         || item_size > SK_CIRCBUF_CHUNK_MAXIMUM_ITEM_SIZE)
180     {
181         return SK_CIRCBUF_E_BAD_PARAM;
182     }
183 
184     buf = (sk_circbuf_t*)calloc(1, sizeof(sk_circbuf_t));
185     if (buf == NULL) {
186         return SK_CIRCBUF_E_ALLOC;
187     }
188 
189     buf->cellsize = item_size;
190 
191     buf->cells_per_chunk = SK_CIRCBUF_CHUNK_MAX_SIZE / item_size;
192     if (buf->cells_per_chunk < SK_CIRCBUF_MINIMUM_ITEMS_PER_CHUNK) {
193         buf->cells_per_chunk = SK_CIRCBUF_MINIMUM_ITEMS_PER_CHUNK;
194     }
195 
196     /* Number of chunks required to handle item_count cells */
197     chunks = 1 + (item_count - 1) / buf->cells_per_chunk;
198     buf->maxcells = buf->cells_per_chunk * chunks;
199 
200     /* Create the initial chunk */
201     buf->reader_chunk = buf->writer_chunk = circbuf_alloc_chunk(buf);
202     if (buf->reader_chunk == NULL) {
203         free(buf);
204         return SK_CIRCBUF_E_ALLOC;
205     }
206     /* The initial chunk needs to pretend that its reader starts at -1
207      * instead of 0, because its reader is not coming from a previous
208      * chunk.  This is a special case that should only happen once. */
209     buf->reader_chunk->reader = buf->cells_per_chunk - 1;
210     buf->reader_chunk->next_reader = 0;
211 
212     pthread_mutex_init(&buf->mutex, NULL);
213     pthread_cond_init(&buf->cond, NULL);
214     *buf_out = buf;
215     return SK_CIRCBUF_OK;
216 }
217 
218 
219 int
skCircBufGetWriterBlock(sk_circbuf_t * buf,void * writer_pos,uint32_t * out_item_count)220 skCircBufGetWriterBlock(
221     sk_circbuf_t       *buf,
222     void               *writer_pos,
223     uint32_t           *out_item_count)
224 {
225     int retval;
226 
227     assert(buf);
228     assert(writer_pos);
229 
230     pthread_mutex_lock(&buf->mutex);
231 
232     ++buf->wait_count;
233 
234     /* Wait for an empty cell */
235     while (!buf->destroyed && (buf->cellcount == buf->maxcells)) {
236         TRACEMSG((("skCircBufGetWriterBlock() full, count is %" PRIu32),
237                   buf->cellcount));
238         pthread_cond_wait(&buf->cond, &buf->mutex);
239     }
240 
241     if (buf->cellcount <= 1) {
242         /* If previously, the buffer was empty, signal waiters */
243         pthread_cond_broadcast(&buf->cond);
244     }
245 
246     /* Increment the cell count */
247     ++buf->cellcount;
248 
249     if (out_item_count) {
250         *out_item_count = buf->cellcount;
251     }
252 
253     if (buf->destroyed) {
254         *(uint8_t**)writer_pos = NULL;
255         retval = SK_CIRCBUF_E_STOPPED;
256         pthread_cond_broadcast(&buf->cond);
257     } else {
258         /* Get the writer chunk */
259         circbuf_chunk_t *chunk = buf->writer_chunk;
260 
261         /* If the writer chunk is full */
262         if (chunk->full) {
263             assert(chunk->next == NULL);
264             chunk->next = circbuf_alloc_chunk(buf);
265             if (chunk->next == NULL) {
266                 *(uint8_t**)writer_pos = NULL;
267                 retval = SK_CIRCBUF_E_ALLOC;
268                 goto END;
269             }
270 
271             /* Make the next chunk the new writer chunk*/
272             chunk = chunk->next;
273             assert(chunk->next == NULL);
274             buf->writer_chunk = chunk;
275         }
276         /* Return value is the next writer position */
277         *(uint8_t**)writer_pos = &chunk->data[chunk->next_writer
278                                               * buf->cellsize];
279         retval = SK_CIRCBUF_OK;
280 
281         /* Increment the current writer and the next_writer,
282          * accounting for wrapping of the next_writer */
283         chunk->writer = chunk->next_writer;
284         ++chunk->next_writer;
285         if (chunk->next_writer == buf->cells_per_chunk) {
286             chunk->next_writer = 0;
287         }
288 
289         /* Check to see if we have filled this chunk */
290         if (chunk->next_writer == chunk->reader) {
291             chunk->full = 1;
292         }
293     }
294 
295   END:
296 
297     --buf->wait_count;
298 
299     pthread_mutex_unlock(&buf->mutex);
300 
301     return retval;
302 }
303 
304 
305 int
skCircBufGetReaderBlock(sk_circbuf_t * buf,void * reader_pos,uint32_t * out_item_count)306 skCircBufGetReaderBlock(
307     sk_circbuf_t       *buf,
308     void               *reader_pos,
309     uint32_t           *out_item_count)
310 {
311     int retval;
312 
313     assert(buf);
314     assert(reader_pos);
315 
316     pthread_mutex_lock(&buf->mutex);
317 
318     ++buf->wait_count;
319 
320     /* Wait for a full cell */
321     while (!buf->destroyed && (buf->cellcount <= 1)) {
322         pthread_cond_wait(&buf->cond, &buf->mutex);
323     }
324 
325     /* If previously, the buffer was full, signal waiters */
326     if (buf->cellcount == buf->maxcells) {
327         pthread_cond_broadcast(&buf->cond);
328     }
329 
330     if (out_item_count) {
331         *out_item_count = buf->cellcount;
332     }
333 
334     /* Decrement the cell count */
335     --buf->cellcount;
336 
337     if (buf->destroyed) {
338         *(uint8_t**)reader_pos = NULL;
339         retval = SK_CIRCBUF_E_STOPPED;
340         pthread_cond_broadcast(&buf->cond);
341     } else {
342         /* Get the reader chunk */
343         circbuf_chunk_t *chunk = buf->reader_chunk;
344 
345         /* Mark the chunk as not full */
346         chunk->full = 0;
347 
348         /* Increment the reader and the next_reader, accounting for
349          * wrapping of the next_reader */
350         chunk->reader = chunk->next_reader;
351         ++chunk->next_reader;
352         if (chunk->next_reader == buf->cells_per_chunk) {
353             chunk->next_reader = 0;
354         }
355 
356         /* Move to next chunk if we have emptied this one (and not last) */
357         if (chunk->reader == chunk->next_writer) {
358             circbuf_chunk_t *next_chunk;
359 
360             next_chunk = chunk->next;
361 
362             /* Free the reader chunk.  Save as spare_chunk if empty */
363             if (buf->spare_chunk) {
364                 free(chunk->data);
365                 free(chunk);
366             } else {
367                 buf->spare_chunk = chunk;
368             }
369 
370             chunk = buf->reader_chunk = next_chunk;
371             assert(chunk);
372         }
373 
374         /* Return value is the current reader position */
375         *(uint8_t**)reader_pos = &chunk->data[chunk->reader * buf->cellsize];
376         retval = SK_CIRCBUF_OK;
377     }
378 
379     --buf->wait_count;
380 
381     pthread_mutex_unlock(&buf->mutex);
382 
383     return retval;
384 }
385 
386 
387 void
skCircBufStop(sk_circbuf_t * buf)388 skCircBufStop(
389     sk_circbuf_t       *buf)
390 {
391     pthread_mutex_lock(&buf->mutex);
392     buf->destroyed = 1;
393     pthread_cond_broadcast(&buf->cond);
394     while (buf->wait_count) {
395         pthread_cond_wait(&buf->cond, &buf->mutex);
396     }
397     pthread_mutex_unlock(&buf->mutex);
398 }
399 
400 
401 void
skCircBufDestroy(sk_circbuf_t * buf)402 skCircBufDestroy(
403     sk_circbuf_t       *buf)
404 {
405     circbuf_chunk_t *chunk;
406     circbuf_chunk_t *next_chunk;
407 
408     if (!buf) {
409         return;
410     }
411     pthread_mutex_lock(&buf->mutex);
412     if (!buf->destroyed) {
413         buf->destroyed = 1;
414         pthread_cond_broadcast(&buf->cond);
415         while (buf->wait_count) {
416             pthread_cond_wait(&buf->cond, &buf->mutex);
417         }
418     }
419     TRACEMSG((("skCircBufDestroy(): Buffer has %" PRIu32 " records"),
420               buf->cellcount));
421     pthread_mutex_unlock(&buf->mutex);
422 
423     pthread_mutex_destroy(&buf->mutex);
424     pthread_cond_destroy(&buf->cond);
425 
426     chunk = buf->reader_chunk;
427     while (chunk) {
428         next_chunk = chunk->next;
429         free(chunk->data);
430         free(chunk);
431         chunk = next_chunk;
432     }
433 
434     if (buf->spare_chunk) {
435         free(buf->spare_chunk->data);
436         free(buf->spare_chunk);
437     }
438 
439     free(buf);
440 }
441 
442 
443 /*
444 ** Local Variables:
445 ** mode:c
446 ** indent-tabs-mode:nil
447 ** c-basic-offset:4
448 ** End:
449 */
450