1 /*
2 ** Copyright (C) 2004-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8
9 #include <silk/silk.h>
10
11 RCSIDENT("$SiLK: circbuf.c ef14e54179be 2020-04-14 21:57:45Z mthomas $");
12
13 #include <silk/sklog.h>
14 #include "circbuf.h"
15
16 #ifdef CIRCBUF_TRACE_LEVEL
17 #define TRACEMSG_LEVEL 1
18 #endif
19 #define TRACEMSG( msg) TRACEMSG_TO_TRACEMSGLVL(1, msg)
20 #include <silk/sktracemsg.h>
21
22 /* Minimum number of items which should be storable in a chunk */
23 #define SK_CIRCBUF_MINIMUM_ITEMS_PER_CHUNK 3
24
25 /* Maximum possible size of a single item */
26 #define SK_CIRCBUF_CHUNK_MAXIMUM_ITEM_SIZE \
27 ((1 << 28) / SK_CIRCBUF_MINIMUM_ITEMS_PER_CHUNK)
28
29
30 /*
31 * The sk_circbuf_t hands cells to the writing thread which that
32 * thread fills. The sk_circbuf_t holds onto these cells until the
33 * reading thread requests them. The maxinum number of cells a
34 * sk_circbuf_t may allocate is specified at creatation time.
35 * However, the cells are not allocated as one block of memory.
36 * Instead, the sk_circbuf_t allocates smaller blocks of memory
37 * called chunks. All chunks are the same size. To summarize, the
38 * sk_circbuf_t is composed of multiple chunks, and a chunk is
39 * composed of multiple cells.
40 *
41 * For each chunk, the 'writer' member points to the cell currently
42 * in use by the writing thread, and the 'reader' member points to
43 * the cell currently in use by the reading thread.
44 *
45 * All cells "between" the 'reader' and the 'writer' have data. In
46 * the diagram below, the 'writer' has wrapped around, and all
47 * cells with 'D' have data. 'W' is where the writing thread is
48 * currently writing data, and 'R' is where the reading thread is
49 * reading.
50 *
51 * _ _ _ _ _ _ _ _ _ _ _ _
52 * |D|D|W|_|_|_|_|_|R|D|D|D|
53 * A A A A
54 * | | | |
55 * | next_wtr | next_rdr
56 * | |
57 * writer reader
58 *
59 * When the writing thread or reading thread finishes with a cell,
60 * it calls the appropriate "get next" function which releases the
61 * current cell and moves the thread to the next cell.
62 *
63 * If a chunk becomes full and the number of cells is not at the
64 * maximum, a new chunk is allocated and the writer starts using
65 * cells from the new chunk. Depending on the chunk size and
66 * maximum number of cells allowed, there may be multiple chunks in
67 * the chunk list between the writer and the reader.
68 *
69 * Once the reading thread finishes with all the cells in the
70 * current chunk, the reader moves to the first cell of the next
71 * chunk in the chunk list, and the chunk the reader just completed
72 * is discarded. The sk_circbuf_t is circular within a chunk, but
73 * like a linked list between multiple chunks.
74 *
75 * The first time the sk_circbuf_t has a chunk to discard, the
76 * sk_circbuf_t stores the chunk as spare (instead of deallocating
77 * the chunk). When a chunk needs to be discard and the
78 * sk_circbuf_t already has a spare chunk, the chunk is
79 * deallocated.
80 *
81 */
82 typedef struct circbuf_chunk_st circbuf_chunk_t;
83 struct circbuf_chunk_st {
84 /* Next chunk in chunk list */
85 circbuf_chunk_t *next;
86 /* Next writer cell index */
87 uint32_t next_writer;
88 /* Current writer cell index */
89 uint32_t writer;
90 /* Next reader cell index */
91 uint32_t next_reader;
92 /* Current reader cell index */
93 uint32_t reader;
94 /* Buffer containing cells */
95 uint8_t *data;
96 /* True if all cells are used */
97 unsigned full :1;
98 };
99
100
101 /* sk_circbuf_t */
102 struct sk_circbuf_st {
103 /* Maximum number of cells */
104 uint32_t maxcells;
105 /* Current number of cells in use, across all chunks */
106 uint32_t cellcount;
107 /* Size of a single cell */
108 uint32_t cellsize;
109 /* Number of cells per chunk */
110 uint32_t cells_per_chunk;
111 /* Writer chunk */
112 circbuf_chunk_t *writer_chunk;
113 /* Rreader chunk */
114 circbuf_chunk_t *reader_chunk;
115 /* Spare chunk */
116 circbuf_chunk_t *spare_chunk;
117 /* Mutex */
118 pthread_mutex_t mutex;
119 /* Condition variable */
120 pthread_cond_t cond;
121 /* Number of threads waiting on this buf */
122 uint32_t wait_count;
123 /* True if the buf has been stopped */
124 unsigned destroyed : 1;
125 };
126 /* typedef struct sk_circbuf_st sk_circbuf_t; */
127
128
129 /* Allocate a new chunk */
130 static circbuf_chunk_t *
circbuf_alloc_chunk(sk_circbuf_t * buf)131 circbuf_alloc_chunk(
132 sk_circbuf_t *buf)
133 {
134 circbuf_chunk_t *chunk;
135
136 if (buf->spare_chunk) {
137 /* If there is a spare chunk, use it. We maintain a spare
138 * chunk to avoid reallocating frequently when items are
139 * removed more quickly then they are added. */
140 chunk = buf->spare_chunk;
141 buf->spare_chunk = NULL;
142 chunk->next_writer = chunk->reader = 0;
143 } else {
144 /* Otherwise, allocate a new chunk. */
145 chunk = (circbuf_chunk_t*)calloc(1, sizeof(circbuf_chunk_t));
146 if (chunk == NULL) {
147 return NULL;
148 }
149 chunk->data = (uint8_t*)malloc(buf->cells_per_chunk * buf->cellsize);
150 if (chunk->data == NULL) {
151 free(chunk);
152 return NULL;
153 }
154 }
155 chunk->writer = buf->cells_per_chunk - 1;
156 chunk->next_reader = 1;
157 chunk->next = NULL;
158
159 return chunk;
160 }
161
162
163 int
skCircBufCreate(sk_circbuf_t ** buf_out,uint32_t item_size,uint32_t item_count)164 skCircBufCreate(
165 sk_circbuf_t **buf_out,
166 uint32_t item_size,
167 uint32_t item_count)
168 {
169 sk_circbuf_t *buf;
170 uint32_t chunks;
171
172 if (NULL == buf_out) {
173 return SK_CIRCBUF_E_BAD_PARAM;
174 }
175 *buf_out = NULL;
176
177 if (item_count == 0
178 || item_size == 0
179 || item_size > SK_CIRCBUF_CHUNK_MAXIMUM_ITEM_SIZE)
180 {
181 return SK_CIRCBUF_E_BAD_PARAM;
182 }
183
184 buf = (sk_circbuf_t*)calloc(1, sizeof(sk_circbuf_t));
185 if (buf == NULL) {
186 return SK_CIRCBUF_E_ALLOC;
187 }
188
189 buf->cellsize = item_size;
190
191 buf->cells_per_chunk = SK_CIRCBUF_CHUNK_MAX_SIZE / item_size;
192 if (buf->cells_per_chunk < SK_CIRCBUF_MINIMUM_ITEMS_PER_CHUNK) {
193 buf->cells_per_chunk = SK_CIRCBUF_MINIMUM_ITEMS_PER_CHUNK;
194 }
195
196 /* Number of chunks required to handle item_count cells */
197 chunks = 1 + (item_count - 1) / buf->cells_per_chunk;
198 buf->maxcells = buf->cells_per_chunk * chunks;
199
200 /* Create the initial chunk */
201 buf->reader_chunk = buf->writer_chunk = circbuf_alloc_chunk(buf);
202 if (buf->reader_chunk == NULL) {
203 free(buf);
204 return SK_CIRCBUF_E_ALLOC;
205 }
206 /* The initial chunk needs to pretend that its reader starts at -1
207 * instead of 0, because its reader is not coming from a previous
208 * chunk. This is a special case that should only happen once. */
209 buf->reader_chunk->reader = buf->cells_per_chunk - 1;
210 buf->reader_chunk->next_reader = 0;
211
212 pthread_mutex_init(&buf->mutex, NULL);
213 pthread_cond_init(&buf->cond, NULL);
214 *buf_out = buf;
215 return SK_CIRCBUF_OK;
216 }
217
218
219 int
skCircBufGetWriterBlock(sk_circbuf_t * buf,void * writer_pos,uint32_t * out_item_count)220 skCircBufGetWriterBlock(
221 sk_circbuf_t *buf,
222 void *writer_pos,
223 uint32_t *out_item_count)
224 {
225 int retval;
226
227 assert(buf);
228 assert(writer_pos);
229
230 pthread_mutex_lock(&buf->mutex);
231
232 ++buf->wait_count;
233
234 /* Wait for an empty cell */
235 while (!buf->destroyed && (buf->cellcount == buf->maxcells)) {
236 TRACEMSG((("skCircBufGetWriterBlock() full, count is %" PRIu32),
237 buf->cellcount));
238 pthread_cond_wait(&buf->cond, &buf->mutex);
239 }
240
241 if (buf->cellcount <= 1) {
242 /* If previously, the buffer was empty, signal waiters */
243 pthread_cond_broadcast(&buf->cond);
244 }
245
246 /* Increment the cell count */
247 ++buf->cellcount;
248
249 if (out_item_count) {
250 *out_item_count = buf->cellcount;
251 }
252
253 if (buf->destroyed) {
254 *(uint8_t**)writer_pos = NULL;
255 retval = SK_CIRCBUF_E_STOPPED;
256 pthread_cond_broadcast(&buf->cond);
257 } else {
258 /* Get the writer chunk */
259 circbuf_chunk_t *chunk = buf->writer_chunk;
260
261 /* If the writer chunk is full */
262 if (chunk->full) {
263 assert(chunk->next == NULL);
264 chunk->next = circbuf_alloc_chunk(buf);
265 if (chunk->next == NULL) {
266 *(uint8_t**)writer_pos = NULL;
267 retval = SK_CIRCBUF_E_ALLOC;
268 goto END;
269 }
270
271 /* Make the next chunk the new writer chunk*/
272 chunk = chunk->next;
273 assert(chunk->next == NULL);
274 buf->writer_chunk = chunk;
275 }
276 /* Return value is the next writer position */
277 *(uint8_t**)writer_pos = &chunk->data[chunk->next_writer
278 * buf->cellsize];
279 retval = SK_CIRCBUF_OK;
280
281 /* Increment the current writer and the next_writer,
282 * accounting for wrapping of the next_writer */
283 chunk->writer = chunk->next_writer;
284 ++chunk->next_writer;
285 if (chunk->next_writer == buf->cells_per_chunk) {
286 chunk->next_writer = 0;
287 }
288
289 /* Check to see if we have filled this chunk */
290 if (chunk->next_writer == chunk->reader) {
291 chunk->full = 1;
292 }
293 }
294
295 END:
296
297 --buf->wait_count;
298
299 pthread_mutex_unlock(&buf->mutex);
300
301 return retval;
302 }
303
304
305 int
skCircBufGetReaderBlock(sk_circbuf_t * buf,void * reader_pos,uint32_t * out_item_count)306 skCircBufGetReaderBlock(
307 sk_circbuf_t *buf,
308 void *reader_pos,
309 uint32_t *out_item_count)
310 {
311 int retval;
312
313 assert(buf);
314 assert(reader_pos);
315
316 pthread_mutex_lock(&buf->mutex);
317
318 ++buf->wait_count;
319
320 /* Wait for a full cell */
321 while (!buf->destroyed && (buf->cellcount <= 1)) {
322 pthread_cond_wait(&buf->cond, &buf->mutex);
323 }
324
325 /* If previously, the buffer was full, signal waiters */
326 if (buf->cellcount == buf->maxcells) {
327 pthread_cond_broadcast(&buf->cond);
328 }
329
330 if (out_item_count) {
331 *out_item_count = buf->cellcount;
332 }
333
334 /* Decrement the cell count */
335 --buf->cellcount;
336
337 if (buf->destroyed) {
338 *(uint8_t**)reader_pos = NULL;
339 retval = SK_CIRCBUF_E_STOPPED;
340 pthread_cond_broadcast(&buf->cond);
341 } else {
342 /* Get the reader chunk */
343 circbuf_chunk_t *chunk = buf->reader_chunk;
344
345 /* Mark the chunk as not full */
346 chunk->full = 0;
347
348 /* Increment the reader and the next_reader, accounting for
349 * wrapping of the next_reader */
350 chunk->reader = chunk->next_reader;
351 ++chunk->next_reader;
352 if (chunk->next_reader == buf->cells_per_chunk) {
353 chunk->next_reader = 0;
354 }
355
356 /* Move to next chunk if we have emptied this one (and not last) */
357 if (chunk->reader == chunk->next_writer) {
358 circbuf_chunk_t *next_chunk;
359
360 next_chunk = chunk->next;
361
362 /* Free the reader chunk. Save as spare_chunk if empty */
363 if (buf->spare_chunk) {
364 free(chunk->data);
365 free(chunk);
366 } else {
367 buf->spare_chunk = chunk;
368 }
369
370 chunk = buf->reader_chunk = next_chunk;
371 assert(chunk);
372 }
373
374 /* Return value is the current reader position */
375 *(uint8_t**)reader_pos = &chunk->data[chunk->reader * buf->cellsize];
376 retval = SK_CIRCBUF_OK;
377 }
378
379 --buf->wait_count;
380
381 pthread_mutex_unlock(&buf->mutex);
382
383 return retval;
384 }
385
386
387 void
skCircBufStop(sk_circbuf_t * buf)388 skCircBufStop(
389 sk_circbuf_t *buf)
390 {
391 pthread_mutex_lock(&buf->mutex);
392 buf->destroyed = 1;
393 pthread_cond_broadcast(&buf->cond);
394 while (buf->wait_count) {
395 pthread_cond_wait(&buf->cond, &buf->mutex);
396 }
397 pthread_mutex_unlock(&buf->mutex);
398 }
399
400
401 void
skCircBufDestroy(sk_circbuf_t * buf)402 skCircBufDestroy(
403 sk_circbuf_t *buf)
404 {
405 circbuf_chunk_t *chunk;
406 circbuf_chunk_t *next_chunk;
407
408 if (!buf) {
409 return;
410 }
411 pthread_mutex_lock(&buf->mutex);
412 if (!buf->destroyed) {
413 buf->destroyed = 1;
414 pthread_cond_broadcast(&buf->cond);
415 while (buf->wait_count) {
416 pthread_cond_wait(&buf->cond, &buf->mutex);
417 }
418 }
419 TRACEMSG((("skCircBufDestroy(): Buffer has %" PRIu32 " records"),
420 buf->cellcount));
421 pthread_mutex_unlock(&buf->mutex);
422
423 pthread_mutex_destroy(&buf->mutex);
424 pthread_cond_destroy(&buf->cond);
425
426 chunk = buf->reader_chunk;
427 while (chunk) {
428 next_chunk = chunk->next;
429 free(chunk->data);
430 free(chunk);
431 chunk = next_chunk;
432 }
433
434 if (buf->spare_chunk) {
435 free(buf->spare_chunk->data);
436 free(buf->spare_chunk);
437 }
438
439 free(buf);
440 }
441
442
443 /*
444 ** Local Variables:
445 ** mode:c
446 ** indent-tabs-mode:nil
447 ** c-basic-offset:4
448 ** End:
449 */
450