1 /*
2 * Copyright (C) 2010-2011 Red Hat, Inc.
3 *
4 * Author: Angus Salkeld <asalkeld@redhat.com>
5 *
6 * This file is part of libqb.
7 *
8 * libqb is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU Lesser General Public License as published by
10 * the Free Software Foundation, either version 2.1 of the License, or
11 * (at your option) any later version.
12 *
13 * libqb is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public License
19 * along with libqb. If not, see <http://www.gnu.org/licenses/>.
20 */
21 #include "ringbuffer_int.h"
22 #include <qb/qbdefs.h>
23 #include "atomic_int.h"
24
25 #define QB_RB_FILE_HEADER_VERSION 1
26
27 /*
28 * #define CRAZY_DEBUG_PRINTFS 1
29 */
30 #ifdef CRAZY_DEBUG_PRINTFS
31 #define DEBUG_PRINTF(format, args...) \
32 do { \
33 printf(format, ##args); \
34 } while(0)
35 #else
36 #define DEBUG_PRINTF(format, args...)
37 #endif /* CRAZY_DEBUG_PRINTFS */
38
39 /*
40 * move the write pointer to the next 128 byte boundary
41 * write_pt goes in 4 bytes (sizeof(uint32_t))
42 * #define USE_CACHE_LINE_ALIGNMENT 1
43 */
44 #ifdef USE_CACHE_LINE_ALIGNMENT
45 #define QB_CACHE_LINE_SIZE 128
46 #define QB_CACHE_LINE_WORDS (QB_CACHE_LINE_SIZE/sizeof(uint32_t))
47 #define idx_cache_line_step(idx) \
48 do { \
49 if (idx % QB_CACHE_LINE_WORDS) { \
50 idx += (QB_CACHE_LINE_WORDS - (idx % QB_CACHE_LINE_WORDS)); \
51 } \
52 if (idx > (rb->shared_hdr->word_size - 1)) { \
53 idx = ((idx) % (rb->shared_hdr->word_size)); \
54 } \
55 } while (0)
56 #else
57 #define QB_CACHE_LINE_SIZE 0
58 #define QB_CACHE_LINE_WORDS 0
59 #define idx_cache_line_step(idx) \
60 do { \
61 if (idx > (rb->shared_hdr->word_size - 1)) { \
62 idx = ((idx) % (rb->shared_hdr->word_size)); \
63 } \
64 } while (0)
65 #endif
66
67
68 /* the chunk header is two words
69 * 1) the chunk data size
70 * 2) the magic number
71 */
72 #define QB_RB_CHUNK_HEADER_WORDS 2
73 #define QB_RB_CHUNK_HEADER_SIZE (sizeof(uint32_t) * QB_RB_CHUNK_HEADER_WORDS)
74 /*
75 * margin is the gap we leave when checking to see if we have enough
76 * space for a new chunk.
77 * So:
78 * qb_rb_space_free() >= QB_RB_CHUNK_MARGIN + new data chunk
79 * The extra word size is to allow for non word sized data chunks.
80 * QB_CACHE_LINE_WORDS is to make sure we have space to align the
81 * chunk.
82 */
83 #define QB_RB_WORD_ALIGN 1
84 #define QB_RB_CHUNK_MARGIN (sizeof(uint32_t) * (QB_RB_CHUNK_HEADER_WORDS +\
85 QB_RB_WORD_ALIGN +\
86 QB_CACHE_LINE_WORDS))
87 #define QB_RB_CHUNK_MAGIC 0xA1A1A1A1
88 #define QB_RB_CHUNK_MAGIC_DEAD 0xD0D0D0D0
89 #define QB_RB_CHUNK_MAGIC_ALLOC 0xA110CED0
90 #define QB_RB_CHUNK_SIZE_GET(rb, pointer) rb->shared_data[pointer]
91 #define QB_RB_CHUNK_MAGIC_GET(rb, pointer) \
92 qb_atomic_int_get_ex((int32_t*)&rb->shared_data[(pointer + 1) % rb->shared_hdr->word_size], \
93 QB_ATOMIC_ACQUIRE)
94 #define QB_RB_CHUNK_MAGIC_SET(rb, pointer, new_val) \
95 qb_atomic_int_set_ex((int32_t*)&rb->shared_data[(pointer + 1) % rb->shared_hdr->word_size], \
96 new_val, QB_ATOMIC_RELEASE)
97 #define QB_RB_CHUNK_DATA_GET(rb, pointer) \
98 &rb->shared_data[(pointer + QB_RB_CHUNK_HEADER_WORDS) % rb->shared_hdr->word_size]
99
100 #define QB_MAGIC_ASSERT(_ptr_) \
101 do { \
102 uint32_t chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, _ptr_); \
103 if (chunk_magic != QB_RB_CHUNK_MAGIC) print_header(rb); \
104 assert(chunk_magic == QB_RB_CHUNK_MAGIC); \
105 } while (0)
106
107 #define idx_step(idx) \
108 do { \
109 if (idx > (rb->shared_hdr->word_size - 1)) { \
110 idx = ((idx) % (rb->shared_hdr->word_size)); \
111 } \
112 } while (0)
113
114 static void print_header(struct qb_ringbuffer_s * rb);
115 static int _rb_chunk_reclaim(struct qb_ringbuffer_s * rb);
116
117 qb_ringbuffer_t *
qb_rb_open(const char * name,size_t size,uint32_t flags,size_t shared_user_data_size)118 qb_rb_open(const char *name, size_t size, uint32_t flags,
119 size_t shared_user_data_size)
120 {
121 return qb_rb_open_2(name, size, flags, shared_user_data_size, NULL);
122 }
123
124 qb_ringbuffer_t *
qb_rb_open_2(const char * name,size_t size,uint32_t flags,size_t shared_user_data_size,struct qb_rb_notifier * notifiers)125 qb_rb_open_2(const char *name, size_t size, uint32_t flags,
126 size_t shared_user_data_size,
127 struct qb_rb_notifier *notifiers)
128 {
129 struct qb_ringbuffer_s *rb;
130 size_t real_size;
131 size_t shared_size;
132 char path[PATH_MAX];
133 int32_t fd_hdr;
134 int32_t fd_data;
135 uint32_t file_flags = O_RDWR;
136 char filename[PATH_MAX];
137 int32_t error = 0;
138 void *shm_addr;
139 long page_size = sysconf(_SC_PAGESIZE);
140
141 #ifdef QB_ARCH_HPPA
142 page_size = QB_MAX(page_size, 0x00400000); /* align to page colour */
143 #elif defined(QB_FORCE_SHM_ALIGN)
144 page_size = QB_MAX(page_size, 16 * 1024);
145 #endif /* QB_FORCE_SHM_ALIGN */
146 /* The user of this api expects the 'size' parameter passed into this function
147 * to be reflective of the max size single write we can do to the
148 * ringbuffer. This means we have to add both the 'margin' space used
149 * to calculate if there is enough space for a new chunk as well as the '+1' that
150 * prevents overlap of the read/write pointers */
151 size += QB_RB_CHUNK_MARGIN + 1;
152 real_size = QB_ROUNDUP(size, page_size);
153
154 shared_size =
155 sizeof(struct qb_ringbuffer_shared_s) + shared_user_data_size;
156
157 if (flags & QB_RB_FLAG_CREATE) {
158 file_flags |= O_CREAT | O_TRUNC | O_EXCL;
159 }
160
161 rb = calloc(1, sizeof(struct qb_ringbuffer_s));
162 if (rb == NULL) {
163 return NULL;
164 }
165
166 /*
167 * Create a shared_hdr memory segment for the header.
168 */
169 snprintf(filename, PATH_MAX, "%s-header", name);
170 fd_hdr = qb_sys_mmap_file_open(path, filename,
171 shared_size, file_flags);
172 if (fd_hdr < 0) {
173 error = fd_hdr;
174 qb_util_log(LOG_ERR, "couldn't create file for mmap");
175 goto cleanup_hdr;
176 }
177
178 rb->shared_hdr = mmap(0,
179 shared_size,
180 PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
181
182 if (rb->shared_hdr == MAP_FAILED) {
183 error = -errno;
184 qb_util_log(LOG_ERR, "couldn't create mmap for header");
185 goto cleanup_hdr;
186 }
187 qb_atomic_init();
188
189 rb->flags = flags;
190
191 /*
192 * create the semaphore
193 */
194 if (flags & QB_RB_FLAG_CREATE) {
195 rb->shared_data = NULL;
196 /* rb->shared_hdr->word_size tracks data by ints and not bytes/chars. */
197 rb->shared_hdr->word_size = real_size / sizeof(uint32_t);
198 rb->shared_hdr->write_pt = 0;
199 rb->shared_hdr->read_pt = 0;
200 (void)strlcpy(rb->shared_hdr->hdr_path, path, PATH_MAX);
201 }
202 if (notifiers && notifiers->post_fn) {
203 error = 0;
204 memcpy(&rb->notifier,
205 notifiers,
206 sizeof(struct qb_rb_notifier));
207 } else {
208 error = qb_rb_sem_create(rb, flags);
209 }
210 if (error < 0) {
211 errno = -error;
212 qb_util_perror(LOG_ERR, "couldn't create a semaphore");
213 goto cleanup_hdr;
214 }
215
216 /* Create the shared_data memory segment for the actual ringbuffer.
217 * They have to be separate.
218 */
219 if (flags & QB_RB_FLAG_CREATE) {
220 snprintf(filename, PATH_MAX, "%s-data", name);
221 fd_data = qb_sys_mmap_file_open(path,
222 filename,
223 real_size, file_flags);
224 (void)strlcpy(rb->shared_hdr->data_path, path, PATH_MAX);
225 } else {
226 fd_data = qb_sys_mmap_file_open(path,
227 rb->shared_hdr->data_path,
228 real_size, file_flags);
229 }
230 if (fd_data < 0) {
231 error = fd_data;
232 qb_util_log(LOG_ERR, "couldn't create file for mmap");
233 goto cleanup_hdr;
234 }
235
236 qb_util_log(LOG_DEBUG,
237 "shm size:%ld; real_size:%ld; rb->word_size:%d", size,
238 real_size, rb->shared_hdr->word_size);
239
240 /* this function closes fd_data */
241 error = qb_sys_circular_mmap(fd_data, &shm_addr, real_size);
242 rb->shared_data = shm_addr;
243 if (error != 0) {
244 qb_util_log(LOG_ERR, "couldn't create circular mmap on %s",
245 rb->shared_hdr->data_path);
246 goto cleanup_data;
247 }
248
249 if (flags & QB_RB_FLAG_CREATE) {
250 memset(rb->shared_data, 0, real_size);
251 rb->shared_data[rb->shared_hdr->word_size] = 5;
252 rb->shared_hdr->ref_count = 1;
253 } else {
254 qb_atomic_int_inc(&rb->shared_hdr->ref_count);
255 }
256
257 close(fd_hdr);
258 return rb;
259
260 cleanup_data:
261 if (flags & QB_RB_FLAG_CREATE) {
262 unlink(rb->shared_hdr->data_path);
263 }
264
265 cleanup_hdr:
266 if (fd_hdr >= 0) {
267 close(fd_hdr);
268 }
269 if (rb && (rb->shared_hdr != MAP_FAILED) && (flags & QB_RB_FLAG_CREATE)) {
270 unlink(rb->shared_hdr->hdr_path);
271 if (rb->notifier.destroy_fn) {
272 (void)rb->notifier.destroy_fn(rb->notifier.instance);
273 }
274 }
275 if (rb && (rb->shared_hdr != MAP_FAILED)) {
276 munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s));
277 }
278 free(rb);
279 errno = -error;
280 return NULL;
281 }
282
283
284 void
qb_rb_close(struct qb_ringbuffer_s * rb)285 qb_rb_close(struct qb_ringbuffer_s * rb)
286 {
287 if (rb == NULL) {
288 return;
289 }
290 qb_enter();
291
292 (void)qb_atomic_int_dec_and_test(&rb->shared_hdr->ref_count);
293 (void)qb_rb_close_helper(rb, rb->flags & QB_RB_FLAG_CREATE, QB_FALSE);
294 }
295
296 void
qb_rb_force_close(struct qb_ringbuffer_s * rb)297 qb_rb_force_close(struct qb_ringbuffer_s * rb)
298 {
299 if (rb == NULL) {
300 return;
301 }
302 qb_enter();
303
304 qb_atomic_int_set(&rb->shared_hdr->ref_count, -1);
305 (void)qb_rb_close_helper(rb, QB_TRUE, QB_TRUE);
306 }
307
308 char *
qb_rb_name_get(struct qb_ringbuffer_s * rb)309 qb_rb_name_get(struct qb_ringbuffer_s * rb)
310 {
311 if (rb == NULL) {
312 return NULL;
313 }
314 return rb->shared_hdr->hdr_path;
315 }
316
317 void *
qb_rb_shared_user_data_get(struct qb_ringbuffer_s * rb)318 qb_rb_shared_user_data_get(struct qb_ringbuffer_s * rb)
319 {
320 if (rb == NULL) {
321 return NULL;
322 }
323 return rb->shared_hdr->user_data;
324 }
325
326 int32_t
qb_rb_refcount_get(struct qb_ringbuffer_s * rb)327 qb_rb_refcount_get(struct qb_ringbuffer_s * rb)
328 {
329 if (rb == NULL) {
330 return -EINVAL;
331 }
332 return qb_atomic_int_get(&rb->shared_hdr->ref_count);
333 }
334
335 ssize_t
qb_rb_space_free(struct qb_ringbuffer_s * rb)336 qb_rb_space_free(struct qb_ringbuffer_s * rb)
337 {
338 uint32_t write_size;
339 uint32_t read_size;
340 size_t space_free = 0;
341
342 if (rb == NULL) {
343 return -EINVAL;
344 }
345 if (rb->notifier.space_used_fn) {
346 return (rb->shared_hdr->word_size * sizeof(uint32_t)) -
347 rb->notifier.space_used_fn(rb->notifier.instance);
348 }
349 write_size = rb->shared_hdr->write_pt;
350 read_size = rb->shared_hdr->read_pt;
351
352 if (write_size > read_size) {
353 space_free =
354 (read_size - write_size + rb->shared_hdr->word_size) - 1;
355 } else if (write_size < read_size) {
356 space_free = (read_size - write_size) - 1;
357 } else {
358 if (rb->notifier.q_len_fn && rb->notifier.q_len_fn(rb->notifier.instance) > 0) {
359 space_free = 0;
360 } else {
361 space_free = rb->shared_hdr->word_size;
362 }
363 }
364
365 /* word -> bytes */
366 return (space_free * sizeof(uint32_t));
367 }
368
369 ssize_t
qb_rb_space_used(struct qb_ringbuffer_s * rb)370 qb_rb_space_used(struct qb_ringbuffer_s * rb)
371 {
372 uint32_t write_size;
373 uint32_t read_size;
374 size_t space_used;
375
376 if (rb == NULL) {
377 return -EINVAL;
378 }
379 if (rb->notifier.space_used_fn) {
380 return rb->notifier.space_used_fn(rb->notifier.instance);
381 }
382 write_size = rb->shared_hdr->write_pt;
383 read_size = rb->shared_hdr->read_pt;
384
385 if (write_size > read_size) {
386 space_used = write_size - read_size;
387 } else if (write_size < read_size) {
388 space_used =
389 (write_size - read_size + rb->shared_hdr->word_size) - 1;
390 } else {
391 space_used = 0;
392 }
393 /* word -> bytes */
394 return (space_used * sizeof(uint32_t));
395 }
396
397 ssize_t
qb_rb_chunks_used(struct qb_ringbuffer_s * rb)398 qb_rb_chunks_used(struct qb_ringbuffer_s *rb)
399 {
400 if (rb == NULL) {
401 return -EINVAL;
402 }
403 if (rb->notifier.q_len_fn) {
404 return rb->notifier.q_len_fn(rb->notifier.instance);
405 }
406 return -ENOTSUP;
407 }
408
409 void *
qb_rb_chunk_alloc(struct qb_ringbuffer_s * rb,size_t len)410 qb_rb_chunk_alloc(struct qb_ringbuffer_s * rb, size_t len)
411 {
412 uint32_t write_pt;
413
414 if (rb == NULL) {
415 errno = EINVAL;
416 return NULL;
417 }
418 /*
419 * Reclaim data if we are over writing and we need space
420 */
421 if (rb->flags & QB_RB_FLAG_OVERWRITE) {
422 while (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_MARGIN)) {
423 int rc = _rb_chunk_reclaim(rb);
424 if (rc != 0) {
425 return NULL; /* errno already set */
426 }
427 }
428 } else {
429 if (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_MARGIN)) {
430 errno = EAGAIN;
431 return NULL;
432 }
433 }
434
435 write_pt = rb->shared_hdr->write_pt;
436 /*
437 * insert the chunk header
438 */
439 rb->shared_data[write_pt] = 0;
440 QB_RB_CHUNK_MAGIC_SET(rb, write_pt, QB_RB_CHUNK_MAGIC_ALLOC);
441
442 /*
443 * return a pointer to the beginning of the chunk data
444 */
445 return (void *)QB_RB_CHUNK_DATA_GET(rb, write_pt);
446
447 }
448
449 static uint32_t
qb_rb_chunk_step(struct qb_ringbuffer_s * rb,uint32_t pointer)450 qb_rb_chunk_step(struct qb_ringbuffer_s * rb, uint32_t pointer)
451 {
452 uint32_t chunk_size = QB_RB_CHUNK_SIZE_GET(rb, pointer);
453 /*
454 * skip over the chunk header
455 */
456 pointer += QB_RB_CHUNK_HEADER_WORDS;
457
458 /*
459 * skip over the user's data.
460 */
461 pointer += (chunk_size / sizeof(uint32_t));
462 /* make allowance for non-word sizes */
463 if ((chunk_size % (sizeof(uint32_t) * QB_RB_WORD_ALIGN)) != 0) {
464 pointer++;
465 }
466
467 idx_cache_line_step(pointer);
468 return pointer;
469 }
470
471 int32_t
qb_rb_chunk_commit(struct qb_ringbuffer_s * rb,size_t len)472 qb_rb_chunk_commit(struct qb_ringbuffer_s * rb, size_t len)
473 {
474 uint32_t old_write_pt;
475
476 if (rb == NULL) {
477 return -EINVAL;
478 }
479 /*
480 * commit the magic & chunk_size
481 */
482 old_write_pt = rb->shared_hdr->write_pt;
483 rb->shared_data[old_write_pt] = len;
484
485 /*
486 * commit the new write pointer
487 */
488 rb->shared_hdr->write_pt = qb_rb_chunk_step(rb, old_write_pt);
489 QB_RB_CHUNK_MAGIC_SET(rb, old_write_pt, QB_RB_CHUNK_MAGIC);
490
491 DEBUG_PRINTF("commit [%zd] read: %u, write: %u -> %u (%u)\n",
492 (rb->notifier.q_len_fn ?
493 rb->notifier.q_len_fn(rb->notifier.instance) : 0),
494 rb->shared_hdr->read_pt,
495 old_write_pt,
496 rb->shared_hdr->write_pt,
497 rb->shared_hdr->word_size);
498
499 /*
500 * post the notification to the reader
501 */
502 if (rb->notifier.post_fn) {
503 return rb->notifier.post_fn(rb->notifier.instance, len);
504 }
505 return 0;
506 }
507
508 ssize_t
qb_rb_chunk_write(struct qb_ringbuffer_s * rb,const void * data,size_t len)509 qb_rb_chunk_write(struct qb_ringbuffer_s * rb, const void *data, size_t len)
510 {
511 char *dest = qb_rb_chunk_alloc(rb, len);
512 int32_t res = 0;
513
514 if (rb == NULL) {
515 return -EINVAL;
516 }
517
518 if (dest == NULL) {
519 return -errno;
520 }
521
522 memcpy(dest, data, len);
523
524 res = qb_rb_chunk_commit(rb, len);
525 if (res < 0) {
526 return res;
527 }
528
529 return len;
530 }
531
532 static int
_rb_chunk_reclaim(struct qb_ringbuffer_s * rb)533 _rb_chunk_reclaim(struct qb_ringbuffer_s * rb)
534 {
535 uint32_t old_read_pt;
536 uint32_t new_read_pt;
537 uint32_t old_chunk_size;
538 uint32_t chunk_magic;
539 int rc = 0;
540
541 old_read_pt = rb->shared_hdr->read_pt;
542 chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, old_read_pt);
543 if (chunk_magic != QB_RB_CHUNK_MAGIC) {
544 errno = EINVAL;
545 return -errno;
546 }
547
548 old_chunk_size = QB_RB_CHUNK_SIZE_GET(rb, old_read_pt);
549 new_read_pt = qb_rb_chunk_step(rb, old_read_pt);
550
551 /*
552 * clear the header
553 */
554 rb->shared_data[old_read_pt] = 0;
555 QB_RB_CHUNK_MAGIC_SET(rb, old_read_pt, QB_RB_CHUNK_MAGIC_DEAD);
556
557 /*
558 * set the new read pointer after clearing the header
559 * to prevent a situation where a fast writer will write their
560 * new chunk between setting the new read pointer and clearing the
561 * header.
562 */
563 rb->shared_hdr->read_pt = new_read_pt;
564
565 if (rb->notifier.reclaim_fn) {
566 rc = rb->notifier.reclaim_fn(rb->notifier.instance,
567 old_chunk_size);
568 if (rc < 0) {
569 errno = -rc;
570 qb_util_perror(LOG_WARNING, "reclaim_fn");
571 }
572 }
573
574 DEBUG_PRINTF("reclaim [%zd]: read: %u -> %u, write: %u\n",
575 (rb->notifier.q_len_fn ?
576 rb->notifier.q_len_fn(rb->notifier.instance) : 0),
577 old_read_pt,
578 rb->shared_hdr->read_pt,
579 rb->shared_hdr->write_pt);
580
581 return rc;
582 }
583
584 void
qb_rb_chunk_reclaim(struct qb_ringbuffer_s * rb)585 qb_rb_chunk_reclaim(struct qb_ringbuffer_s * rb)
586 {
587 if (rb == NULL) {
588 return;
589 }
590 _rb_chunk_reclaim(rb);
591 }
592
593 ssize_t
qb_rb_chunk_peek(struct qb_ringbuffer_s * rb,void ** data_out,int32_t timeout)594 qb_rb_chunk_peek(struct qb_ringbuffer_s * rb, void **data_out, int32_t timeout)
595 {
596 uint32_t read_pt;
597 uint32_t chunk_size;
598 uint32_t chunk_magic;
599 int32_t res = 0;
600
601 if (rb == NULL) {
602 return -EINVAL;
603 }
604 if (rb->notifier.timedwait_fn) {
605 res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout);
606 }
607 if (res < 0 && res != -EIDRM) {
608 if (res == -ETIMEDOUT) {
609 return 0;
610 } else {
611 errno = -res;
612 qb_util_perror(LOG_ERR, "sem_timedwait");
613 }
614 return res;
615 }
616 read_pt = rb->shared_hdr->read_pt;
617 chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt);
618 if (chunk_magic != QB_RB_CHUNK_MAGIC) {
619 if (rb->notifier.post_fn) {
620 (void)rb->notifier.post_fn(rb->notifier.instance, res);
621 }
622 #ifdef EBADMSG
623 return -EBADMSG;
624 #else
625 return -EINVAL;
626 #endif
627 }
628 chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt);
629 *data_out = QB_RB_CHUNK_DATA_GET(rb, read_pt);
630 return chunk_size;
631 }
632
633 ssize_t
qb_rb_chunk_read(struct qb_ringbuffer_s * rb,void * data_out,size_t len,int32_t timeout)634 qb_rb_chunk_read(struct qb_ringbuffer_s * rb, void *data_out, size_t len,
635 int32_t timeout)
636 {
637 uint32_t read_pt;
638 uint32_t chunk_size;
639 uint32_t chunk_magic;
640 int32_t res = 0;
641
642 if (rb == NULL) {
643 return -EINVAL;
644 }
645 if (rb->notifier.timedwait_fn) {
646 res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout);
647 }
648 if (res < 0 && res != -EIDRM) {
649 if (res != -ETIMEDOUT) {
650 errno = -res;
651 qb_util_perror(LOG_ERR, "sem_timedwait");
652 }
653 return res;
654 }
655
656 read_pt = rb->shared_hdr->read_pt;
657 chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt);
658
659 if (chunk_magic != QB_RB_CHUNK_MAGIC) {
660 if (rb->notifier.timedwait_fn == NULL) {
661 return -ETIMEDOUT;
662 } else {
663 (void)rb->notifier.post_fn(rb->notifier.instance, res);
664 #ifdef EBADMSG
665 return -EBADMSG;
666 #else
667 return -EINVAL;
668 #endif
669 }
670 }
671
672 chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt);
673 if (len < chunk_size) {
674 qb_util_log(LOG_ERR,
675 "trying to recv chunk of size %d but %d available",
676 len, chunk_size);
677 if (rb->notifier.post_fn) {
678 (void)rb->notifier.post_fn(rb->notifier.instance, chunk_size);
679 }
680 return -ENOBUFS;
681 }
682
683 memcpy(data_out,
684 QB_RB_CHUNK_DATA_GET(rb, read_pt),
685 chunk_size);
686
687 _rb_chunk_reclaim(rb);
688
689 return chunk_size;
690 }
691
692 static void
print_header(struct qb_ringbuffer_s * rb)693 print_header(struct qb_ringbuffer_s * rb)
694 {
695 printf("Ringbuffer: \n");
696 if (rb->flags & QB_RB_FLAG_OVERWRITE) {
697 printf(" ->OVERWRITE\n");
698 } else {
699 printf(" ->NORMAL\n");
700 }
701 #ifndef S_SPLINT_S
702 printf(" ->write_pt [%" PRIu32 "]\n", rb->shared_hdr->write_pt);
703 printf(" ->read_pt [%" PRIu32 "]\n", rb->shared_hdr->read_pt);
704 printf(" ->size [%" PRIu32 " words]\n", rb->shared_hdr->word_size);
705 printf(" =>free [%zd bytes]\n", qb_rb_space_free(rb));
706 printf(" =>used [%zd bytes]\n", qb_rb_space_used(rb));
707 #endif /* S_SPLINT_S */
708 }
709
710 /*
711 * FILE HEADER ORDER
712 * 1. word_size
713 * 2. write_pt
714 * 3. read_pt
715 * 4. version
716 * 5. header_hash
717 *
718 * 6. data
719 */
720
721 ssize_t
qb_rb_write_to_file(struct qb_ringbuffer_s * rb,int32_t fd)722 qb_rb_write_to_file(struct qb_ringbuffer_s * rb, int32_t fd)
723 {
724 ssize_t result;
725 ssize_t written_size = 0;
726 uint32_t hash = 0;
727 uint32_t version = QB_RB_FILE_HEADER_VERSION;
728
729 if (rb == NULL) {
730 return -EINVAL;
731 }
732 print_header(rb);
733
734 /*
735 * 1. word_size
736 */
737 result = write(fd, &rb->shared_hdr->word_size, sizeof(uint32_t));
738 if (result != sizeof(uint32_t)) {
739 return -errno;
740 }
741 written_size += result;
742
743 /*
744 * 2. 3. store the read & write pointers
745 */
746 result = write(fd, (void *)&rb->shared_hdr->write_pt, sizeof(uint32_t));
747 if (result != sizeof(uint32_t)) {
748 return -errno;
749 }
750 written_size += result;
751 result = write(fd, (void *)&rb->shared_hdr->read_pt, sizeof(uint32_t));
752 if (result != sizeof(uint32_t)) {
753 return -errno;
754 }
755 written_size += result;
756
757 /*
758 * 4. version used
759 */
760 result = write(fd, &version, sizeof(uint32_t));
761 if (result != sizeof(uint32_t)) {
762 return -errno;
763 }
764 written_size += result;
765
766 /*
767 * 5. hash helps us verify header is not corrupted on file read
768 */
769 hash = rb->shared_hdr->word_size + rb->shared_hdr->write_pt + rb->shared_hdr->read_pt + QB_RB_FILE_HEADER_VERSION;
770 result = write(fd, &hash, sizeof(uint32_t));
771 if (result != sizeof(uint32_t)) {
772 return -errno;
773 }
774 written_size += result;
775
776 result = write(fd, rb->shared_data,
777 rb->shared_hdr->word_size * sizeof(uint32_t));
778 if (result != rb->shared_hdr->word_size * sizeof(uint32_t)) {
779 return -errno;
780 }
781 written_size += result;
782
783 qb_util_log(LOG_DEBUG, " writing total of: %zd\n", written_size);
784
785 return written_size;
786 }
787
788 qb_ringbuffer_t *
qb_rb_create_from_file(int32_t fd,uint32_t flags)789 qb_rb_create_from_file(int32_t fd, uint32_t flags)
790 {
791 ssize_t n_read;
792 size_t n_required;
793 size_t total_read = 0;
794 uint32_t read_pt;
795 uint32_t write_pt;
796 struct qb_ringbuffer_s *rb;
797 uint32_t word_size = 0;
798 uint32_t version = 0;
799 uint32_t hash = 0;
800 uint32_t calculated_hash = 0;
801
802 if (fd < 0) {
803 return NULL;
804 }
805
806 /*
807 * 1. word size
808 */
809 n_required = sizeof(uint32_t);
810 n_read = read(fd, &word_size, n_required);
811 if (n_read != n_required) {
812 qb_util_perror(LOG_ERR, "Unable to read blackbox file header");
813 return NULL;
814 }
815 total_read += n_read;
816
817 /*
818 * 2. 3. read & write pointers
819 */
820 n_read = read(fd, &write_pt, sizeof(uint32_t));
821 assert(n_read == sizeof(uint32_t));
822 total_read += n_read;
823
824 n_read = read(fd, &read_pt, sizeof(uint32_t));
825 assert(n_read == sizeof(uint32_t));
826 total_read += n_read;
827
828 /*
829 * 4. version
830 */
831 n_required = sizeof(uint32_t);
832 n_read = read(fd, &version, n_required);
833 if (n_read != n_required) {
834 qb_util_perror(LOG_ERR, "Unable to read blackbox file header");
835 return NULL;
836 }
837 total_read += n_read;
838
839 /*
840 * 5. Hash
841 */
842 n_required = sizeof(uint32_t);
843 n_read = read(fd, &hash, n_required);
844 if (n_read != n_required) {
845 qb_util_perror(LOG_ERR, "Unable to read blackbox file header");
846 return NULL;
847 }
848 total_read += n_read;
849
850 calculated_hash = word_size + write_pt + read_pt + version;
851 if (hash != calculated_hash) {
852 qb_util_log(LOG_ERR, "Corrupt blackbox: File header hash (%d) does not match calculated hash (%d)", hash, calculated_hash);
853 return NULL;
854 } else if (version != QB_RB_FILE_HEADER_VERSION) {
855 qb_util_log(LOG_ERR, "Wrong file header version. Expected %d got %d",
856 QB_RB_FILE_HEADER_VERSION, version);
857 return NULL;
858 }
859
860 /*
861 * 6. data
862 */
863 n_required = (word_size * sizeof(uint32_t));
864
865 /*
866 * qb_rb_open adds QB_RB_CHUNK_MARGIN + 1 to the requested size.
867 */
868 rb = qb_rb_open("create_from_file", n_required - (QB_RB_CHUNK_MARGIN + 1),
869 QB_RB_FLAG_CREATE | QB_RB_FLAG_NO_SEMAPHORE, 0);
870 if (rb == NULL) {
871 return NULL;
872 }
873 rb->shared_hdr->read_pt = read_pt;
874 rb->shared_hdr->write_pt = write_pt;
875
876 n_read = read(fd, rb->shared_data, n_required);
877 if (n_read < 0) {
878 qb_util_perror(LOG_ERR, "Unable to read blackbox file data");
879 goto cleanup_fail;
880 }
881 total_read += n_read;
882
883 if (n_read != n_required) {
884 qb_util_log(LOG_WARNING, "read %zd bytes, but expected %zu",
885 n_read, n_required);
886 goto cleanup_fail;
887 }
888
889 qb_util_log(LOG_DEBUG, "read total of: %zd", total_read);
890 print_header(rb);
891
892 return rb;
893
894 cleanup_fail:
895 qb_rb_close(rb);
896 return NULL;
897 }
898
899 int32_t
qb_rb_chown(struct qb_ringbuffer_s * rb,uid_t owner,gid_t group)900 qb_rb_chown(struct qb_ringbuffer_s * rb, uid_t owner, gid_t group)
901 {
902 int32_t res;
903
904 if (rb == NULL) {
905 return -EINVAL;
906 }
907 res = chown(rb->shared_hdr->data_path, owner, group);
908 if (res < 0 && errno != EPERM) {
909 return -errno;
910 }
911 res = chown(rb->shared_hdr->hdr_path, owner, group);
912 if (res < 0 && errno != EPERM) {
913 return -errno;
914 }
915 return 0;
916 }
917
918 int32_t
qb_rb_chmod(qb_ringbuffer_t * rb,mode_t mode)919 qb_rb_chmod(qb_ringbuffer_t * rb, mode_t mode)
920 {
921 int32_t res;
922
923 if (rb == NULL) {
924 return -EINVAL;
925 }
926 res = chmod(rb->shared_hdr->data_path, mode);
927 if (res < 0) {
928 return -errno;
929 }
930 res = chmod(rb->shared_hdr->hdr_path, mode);
931 if (res < 0) {
932 return -errno;
933 }
934 return 0;
935 }
936