1 /*
2  * Copyright (C) 2010-2011 Red Hat, Inc.
3  *
4  * Author: Angus Salkeld <asalkeld@redhat.com>
5  *
6  * This file is part of libqb.
7  *
8  * libqb is free software: you can redistribute it and/or modify
9  * it under the terms of the GNU Lesser General Public License as published by
10  * the Free Software Foundation, either version 2.1 of the License, or
11  * (at your option) any later version.
12  *
13  * libqb is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public License
19  * along with libqb.  If not, see <http://www.gnu.org/licenses/>.
20  */
21 #include "ringbuffer_int.h"
22 #include <qb/qbdefs.h>
23 #include "atomic_int.h"
24 
25 #define QB_RB_FILE_HEADER_VERSION 1
26 
27 /*
28  * #define CRAZY_DEBUG_PRINTFS 1
29  */
30 #ifdef CRAZY_DEBUG_PRINTFS
31 #define DEBUG_PRINTF(format, args...)	\
32 do {				\
33 	printf(format, ##args);	\
34 } while(0)
35 #else
36 #define DEBUG_PRINTF(format, args...)
37 #endif /* CRAZY_DEBUG_PRINTFS */
38 
39 /*
40  * move the write pointer to the next 128 byte boundary
41  * write_pt goes in 4 bytes (sizeof(uint32_t))
42  * #define USE_CACHE_LINE_ALIGNMENT 1
43  */
44 #ifdef USE_CACHE_LINE_ALIGNMENT
45 #define QB_CACHE_LINE_SIZE 128
46 #define QB_CACHE_LINE_WORDS (QB_CACHE_LINE_SIZE/sizeof(uint32_t))
47 #define idx_cache_line_step(idx)	\
48 do {					\
49 	if (idx % QB_CACHE_LINE_WORDS) {			\
50 		idx += (QB_CACHE_LINE_WORDS - (idx % QB_CACHE_LINE_WORDS));	\
51 	}				\
52 	if (idx > (rb->shared_hdr->word_size - 1)) {		\
53 		idx = ((idx) % (rb->shared_hdr->word_size));	\
54 	}						\
55 } while (0)
56 #else
57 #define QB_CACHE_LINE_SIZE 0
58 #define QB_CACHE_LINE_WORDS 0
59 #define idx_cache_line_step(idx)			\
60 do {							\
61 	if (idx > (rb->shared_hdr->word_size - 1)) {		\
62 		idx = ((idx) % (rb->shared_hdr->word_size));	\
63 	}						\
64 } while (0)
65 #endif
66 
67 
68 /* the chunk header is two words
69  * 1) the chunk data size
70  * 2) the magic number
71  */
72 #define QB_RB_CHUNK_HEADER_WORDS 2
73 #define QB_RB_CHUNK_HEADER_SIZE (sizeof(uint32_t) * QB_RB_CHUNK_HEADER_WORDS)
74 /*
75  * margin is the gap we leave when checking to see if we have enough
76  * space for a new chunk.
77  * So:
78  * qb_rb_space_free() >= QB_RB_CHUNK_MARGIN + new data chunk
79  * The extra word size is to allow for non word sized data chunks.
80  * QB_CACHE_LINE_WORDS is to make sure we have space to align the
81  * chunk.
82  */
83 #define QB_RB_WORD_ALIGN 1
84 #define QB_RB_CHUNK_MARGIN (sizeof(uint32_t) * (QB_RB_CHUNK_HEADER_WORDS +\
85 						QB_RB_WORD_ALIGN +\
86 						QB_CACHE_LINE_WORDS))
87 #define QB_RB_CHUNK_MAGIC		0xA1A1A1A1
88 #define QB_RB_CHUNK_MAGIC_DEAD		0xD0D0D0D0
89 #define QB_RB_CHUNK_MAGIC_ALLOC		0xA110CED0
90 #define QB_RB_CHUNK_SIZE_GET(rb, pointer) rb->shared_data[pointer]
91 #define QB_RB_CHUNK_MAGIC_GET(rb, pointer) \
92 	qb_atomic_int_get_ex((int32_t*)&rb->shared_data[(pointer + 1) % rb->shared_hdr->word_size], \
93                              QB_ATOMIC_ACQUIRE)
94 #define QB_RB_CHUNK_MAGIC_SET(rb, pointer, new_val) \
95 	qb_atomic_int_set_ex((int32_t*)&rb->shared_data[(pointer + 1) % rb->shared_hdr->word_size], \
96 			     new_val, QB_ATOMIC_RELEASE)
97 #define QB_RB_CHUNK_DATA_GET(rb, pointer) \
98 	&rb->shared_data[(pointer + QB_RB_CHUNK_HEADER_WORDS) % rb->shared_hdr->word_size]
99 
100 #define QB_MAGIC_ASSERT(_ptr_) \
101 do {							\
102 	uint32_t chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, _ptr_); \
103 	if (chunk_magic != QB_RB_CHUNK_MAGIC) print_header(rb); \
104 	assert(chunk_magic == QB_RB_CHUNK_MAGIC); \
105 } while (0)
106 
107 #define idx_step(idx)					\
108 do {							\
109 	if (idx > (rb->shared_hdr->word_size - 1)) {		\
110 		idx = ((idx) % (rb->shared_hdr->word_size));	\
111 	}						\
112 } while (0)
113 
114 static void print_header(struct qb_ringbuffer_s * rb);
115 static int _rb_chunk_reclaim(struct qb_ringbuffer_s * rb);
116 
117 qb_ringbuffer_t *
qb_rb_open(const char * name,size_t size,uint32_t flags,size_t shared_user_data_size)118 qb_rb_open(const char *name, size_t size, uint32_t flags,
119 	   size_t shared_user_data_size)
120 {
121 	return qb_rb_open_2(name, size, flags, shared_user_data_size, NULL);
122 }
123 
124 qb_ringbuffer_t *
qb_rb_open_2(const char * name,size_t size,uint32_t flags,size_t shared_user_data_size,struct qb_rb_notifier * notifiers)125 qb_rb_open_2(const char *name, size_t size, uint32_t flags,
126 	     size_t shared_user_data_size,
127 	     struct qb_rb_notifier *notifiers)
128 {
129 	struct qb_ringbuffer_s *rb;
130 	size_t real_size;
131 	size_t shared_size;
132 	char path[PATH_MAX];
133 	int32_t fd_hdr;
134 	int32_t fd_data;
135 	uint32_t file_flags = O_RDWR;
136 	char filename[PATH_MAX];
137 	int32_t error = 0;
138 	void *shm_addr;
139 	long page_size = sysconf(_SC_PAGESIZE);
140 
141 #ifdef QB_ARCH_HPPA
142 	page_size = QB_MAX(page_size, 0x00400000); /* align to page colour */
143 #elif defined(QB_FORCE_SHM_ALIGN)
144 	page_size = QB_MAX(page_size, 16 * 1024);
145 #endif /* QB_FORCE_SHM_ALIGN */
146 	/* The user of this api expects the 'size' parameter passed into this function
147 	 * to be reflective of the max size single write we can do to the
148 	 * ringbuffer.  This means we have to add both the 'margin' space used
149 	 * to calculate if there is enough space for a new chunk as well as the '+1' that
150 	 * prevents overlap of the read/write pointers */
151 	size += QB_RB_CHUNK_MARGIN + 1;
152 	real_size = QB_ROUNDUP(size, page_size);
153 
154 	shared_size =
155 	    sizeof(struct qb_ringbuffer_shared_s) + shared_user_data_size;
156 
157 	if (flags & QB_RB_FLAG_CREATE) {
158 		file_flags |= O_CREAT | O_TRUNC | O_EXCL;
159 	}
160 
161 	rb = calloc(1, sizeof(struct qb_ringbuffer_s));
162 	if (rb == NULL) {
163 		return NULL;
164 	}
165 
166 	/*
167 	 * Create a shared_hdr memory segment for the header.
168 	 */
169 	snprintf(filename, PATH_MAX, "%s-header", name);
170 	fd_hdr = qb_sys_mmap_file_open(path, filename,
171 				       shared_size, file_flags);
172 	if (fd_hdr < 0) {
173 		error = fd_hdr;
174 		qb_util_log(LOG_ERR, "couldn't create file for mmap");
175 		goto cleanup_hdr;
176 	}
177 
178 	rb->shared_hdr = mmap(0,
179 			      shared_size,
180 			      PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
181 
182 	if (rb->shared_hdr == MAP_FAILED) {
183 		error = -errno;
184 		qb_util_log(LOG_ERR, "couldn't create mmap for header");
185 		goto cleanup_hdr;
186 	}
187 	qb_atomic_init();
188 
189 	rb->flags = flags;
190 
191 	/*
192 	 * create the semaphore
193 	 */
194 	if (flags & QB_RB_FLAG_CREATE) {
195 		rb->shared_data = NULL;
196 		/* rb->shared_hdr->word_size tracks data by ints and not bytes/chars. */
197 		rb->shared_hdr->word_size = real_size / sizeof(uint32_t);
198 		rb->shared_hdr->write_pt = 0;
199 		rb->shared_hdr->read_pt = 0;
200 		(void)strlcpy(rb->shared_hdr->hdr_path, path, PATH_MAX);
201 	}
202 	if (notifiers && notifiers->post_fn) {
203 		error = 0;
204 		memcpy(&rb->notifier,
205 		       notifiers,
206 		       sizeof(struct qb_rb_notifier));
207 	} else {
208 		error = qb_rb_sem_create(rb, flags);
209 	}
210 	if (error < 0) {
211 		errno = -error;
212 		qb_util_perror(LOG_ERR, "couldn't create a semaphore");
213 		goto cleanup_hdr;
214 	}
215 
216 	/* Create the shared_data memory segment for the actual ringbuffer.
217 	 * They have to be separate.
218 	 */
219 	if (flags & QB_RB_FLAG_CREATE) {
220 		snprintf(filename, PATH_MAX, "%s-data", name);
221 		fd_data = qb_sys_mmap_file_open(path,
222 						filename,
223 						real_size, file_flags);
224 		(void)strlcpy(rb->shared_hdr->data_path, path, PATH_MAX);
225 	} else {
226 		fd_data = qb_sys_mmap_file_open(path,
227 						rb->shared_hdr->data_path,
228 						real_size, file_flags);
229 	}
230 	if (fd_data < 0) {
231 		error = fd_data;
232 		qb_util_log(LOG_ERR, "couldn't create file for mmap");
233 		goto cleanup_hdr;
234 	}
235 
236 	qb_util_log(LOG_DEBUG,
237 		    "shm size:%ld; real_size:%ld; rb->word_size:%d", size,
238 		    real_size, rb->shared_hdr->word_size);
239 
240 	/* this function closes fd_data */
241 	error = qb_sys_circular_mmap(fd_data, &shm_addr, real_size);
242 	rb->shared_data = shm_addr;
243 	if (error != 0) {
244 		qb_util_log(LOG_ERR, "couldn't create circular mmap on %s",
245 			    rb->shared_hdr->data_path);
246 		goto cleanup_data;
247 	}
248 
249 	if (flags & QB_RB_FLAG_CREATE) {
250 		memset(rb->shared_data, 0, real_size);
251 		rb->shared_data[rb->shared_hdr->word_size] = 5;
252 		rb->shared_hdr->ref_count = 1;
253 	} else {
254 		qb_atomic_int_inc(&rb->shared_hdr->ref_count);
255 	}
256 
257 	close(fd_hdr);
258 	return rb;
259 
260 cleanup_data:
261 	if (flags & QB_RB_FLAG_CREATE) {
262 		unlink(rb->shared_hdr->data_path);
263 	}
264 
265 cleanup_hdr:
266 	if (fd_hdr >= 0) {
267 		close(fd_hdr);
268 	}
269 	if (rb && (rb->shared_hdr != MAP_FAILED) && (flags & QB_RB_FLAG_CREATE)) {
270 		unlink(rb->shared_hdr->hdr_path);
271 		if (rb->notifier.destroy_fn) {
272 			(void)rb->notifier.destroy_fn(rb->notifier.instance);
273 		}
274 	}
275 	if (rb && (rb->shared_hdr != MAP_FAILED)) {
276 		munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s));
277 	}
278 	free(rb);
279 	errno = -error;
280 	return NULL;
281 }
282 
283 
284 void
qb_rb_close(struct qb_ringbuffer_s * rb)285 qb_rb_close(struct qb_ringbuffer_s * rb)
286 {
287 	if (rb == NULL) {
288 		return;
289 	}
290 	qb_enter();
291 
292 	(void)qb_atomic_int_dec_and_test(&rb->shared_hdr->ref_count);
293 	(void)qb_rb_close_helper(rb, rb->flags & QB_RB_FLAG_CREATE, QB_FALSE);
294 }
295 
296 void
qb_rb_force_close(struct qb_ringbuffer_s * rb)297 qb_rb_force_close(struct qb_ringbuffer_s * rb)
298 {
299 	if (rb == NULL) {
300 		return;
301 	}
302 	qb_enter();
303 
304 	qb_atomic_int_set(&rb->shared_hdr->ref_count, -1);
305 	(void)qb_rb_close_helper(rb, QB_TRUE, QB_TRUE);
306 }
307 
308 char *
qb_rb_name_get(struct qb_ringbuffer_s * rb)309 qb_rb_name_get(struct qb_ringbuffer_s * rb)
310 {
311 	if (rb == NULL) {
312 		return NULL;
313 	}
314 	return rb->shared_hdr->hdr_path;
315 }
316 
317 void *
qb_rb_shared_user_data_get(struct qb_ringbuffer_s * rb)318 qb_rb_shared_user_data_get(struct qb_ringbuffer_s * rb)
319 {
320 	if (rb == NULL) {
321 		return NULL;
322 	}
323 	return rb->shared_hdr->user_data;
324 }
325 
326 int32_t
qb_rb_refcount_get(struct qb_ringbuffer_s * rb)327 qb_rb_refcount_get(struct qb_ringbuffer_s * rb)
328 {
329 	if (rb == NULL) {
330 		return -EINVAL;
331 	}
332 	return qb_atomic_int_get(&rb->shared_hdr->ref_count);
333 }
334 
335 ssize_t
qb_rb_space_free(struct qb_ringbuffer_s * rb)336 qb_rb_space_free(struct qb_ringbuffer_s * rb)
337 {
338 	uint32_t write_size;
339 	uint32_t read_size;
340 	size_t space_free = 0;
341 
342 	if (rb == NULL) {
343 		return -EINVAL;
344 	}
345 	if (rb->notifier.space_used_fn) {
346 		return (rb->shared_hdr->word_size * sizeof(uint32_t)) -
347 			rb->notifier.space_used_fn(rb->notifier.instance);
348 	}
349 	write_size = rb->shared_hdr->write_pt;
350 	read_size = rb->shared_hdr->read_pt;
351 
352 	if (write_size > read_size) {
353 		space_free =
354 		    (read_size - write_size + rb->shared_hdr->word_size) - 1;
355 	} else if (write_size < read_size) {
356 		space_free = (read_size - write_size) - 1;
357 	} else {
358 		if (rb->notifier.q_len_fn && rb->notifier.q_len_fn(rb->notifier.instance) > 0) {
359 			space_free = 0;
360 		} else {
361 			space_free = rb->shared_hdr->word_size;
362 		}
363 	}
364 
365 	/* word -> bytes */
366 	return (space_free * sizeof(uint32_t));
367 }
368 
369 ssize_t
qb_rb_space_used(struct qb_ringbuffer_s * rb)370 qb_rb_space_used(struct qb_ringbuffer_s * rb)
371 {
372 	uint32_t write_size;
373 	uint32_t read_size;
374 	size_t space_used;
375 
376 	if (rb == NULL) {
377 		return -EINVAL;
378 	}
379 	if (rb->notifier.space_used_fn) {
380 		return rb->notifier.space_used_fn(rb->notifier.instance);
381 	}
382 	write_size = rb->shared_hdr->write_pt;
383 	read_size = rb->shared_hdr->read_pt;
384 
385 	if (write_size > read_size) {
386 		space_used = write_size - read_size;
387 	} else if (write_size < read_size) {
388 		space_used =
389 		    (write_size - read_size + rb->shared_hdr->word_size) - 1;
390 	} else {
391 		space_used = 0;
392 	}
393 	/* word -> bytes */
394 	return (space_used * sizeof(uint32_t));
395 }
396 
397 ssize_t
qb_rb_chunks_used(struct qb_ringbuffer_s * rb)398 qb_rb_chunks_used(struct qb_ringbuffer_s *rb)
399 {
400 	if (rb == NULL) {
401 		return -EINVAL;
402 	}
403 	if (rb->notifier.q_len_fn) {
404 		return rb->notifier.q_len_fn(rb->notifier.instance);
405 	}
406 	return -ENOTSUP;
407 }
408 
409 void *
qb_rb_chunk_alloc(struct qb_ringbuffer_s * rb,size_t len)410 qb_rb_chunk_alloc(struct qb_ringbuffer_s * rb, size_t len)
411 {
412 	uint32_t write_pt;
413 
414 	if (rb == NULL) {
415 		errno = EINVAL;
416 		return NULL;
417 	}
418 	/*
419 	 * Reclaim data if we are over writing and we need space
420 	 */
421 	if (rb->flags & QB_RB_FLAG_OVERWRITE) {
422 		while (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_MARGIN)) {
423 			int rc = _rb_chunk_reclaim(rb);
424 			if (rc != 0) {
425 				return NULL;  /* errno already set */
426 			}
427 		}
428 	} else {
429 		if (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_MARGIN)) {
430 			errno = EAGAIN;
431 			return NULL;
432 		}
433 	}
434 
435 	write_pt = rb->shared_hdr->write_pt;
436 	/*
437 	 * insert the chunk header
438 	 */
439 	rb->shared_data[write_pt] = 0;
440 	QB_RB_CHUNK_MAGIC_SET(rb, write_pt, QB_RB_CHUNK_MAGIC_ALLOC);
441 
442 	/*
443 	 * return a pointer to the beginning of the chunk data
444 	 */
445 	return (void *)QB_RB_CHUNK_DATA_GET(rb, write_pt);
446 
447 }
448 
449 static uint32_t
qb_rb_chunk_step(struct qb_ringbuffer_s * rb,uint32_t pointer)450 qb_rb_chunk_step(struct qb_ringbuffer_s * rb, uint32_t pointer)
451 {
452 	uint32_t chunk_size = QB_RB_CHUNK_SIZE_GET(rb, pointer);
453 	/*
454 	 * skip over the chunk header
455 	 */
456 	pointer += QB_RB_CHUNK_HEADER_WORDS;
457 
458 	/*
459 	 * skip over the user's data.
460 	 */
461 	pointer += (chunk_size / sizeof(uint32_t));
462 	/* make allowance for non-word sizes */
463 	if ((chunk_size % (sizeof(uint32_t) * QB_RB_WORD_ALIGN)) != 0) {
464 		pointer++;
465 	}
466 
467 	idx_cache_line_step(pointer);
468 	return pointer;
469 }
470 
471 int32_t
qb_rb_chunk_commit(struct qb_ringbuffer_s * rb,size_t len)472 qb_rb_chunk_commit(struct qb_ringbuffer_s * rb, size_t len)
473 {
474 	uint32_t old_write_pt;
475 
476 	if (rb == NULL) {
477 		return -EINVAL;
478 	}
479 	/*
480 	 * commit the magic & chunk_size
481 	 */
482 	old_write_pt = rb->shared_hdr->write_pt;
483 	rb->shared_data[old_write_pt] = len;
484 
485 	/*
486 	 * commit the new write pointer
487 	 */
488 	rb->shared_hdr->write_pt = qb_rb_chunk_step(rb, old_write_pt);
489 	QB_RB_CHUNK_MAGIC_SET(rb, old_write_pt, QB_RB_CHUNK_MAGIC);
490 
491 	DEBUG_PRINTF("commit [%zd] read: %u, write: %u -> %u (%u)\n",
492 		     (rb->notifier.q_len_fn ?
493 		      rb->notifier.q_len_fn(rb->notifier.instance) : 0),
494 		     rb->shared_hdr->read_pt,
495 		     old_write_pt,
496 		     rb->shared_hdr->write_pt,
497 		     rb->shared_hdr->word_size);
498 
499 	/*
500 	 * post the notification to the reader
501 	 */
502 	if (rb->notifier.post_fn) {
503 		return rb->notifier.post_fn(rb->notifier.instance, len);
504 	}
505 	return 0;
506 }
507 
508 ssize_t
qb_rb_chunk_write(struct qb_ringbuffer_s * rb,const void * data,size_t len)509 qb_rb_chunk_write(struct qb_ringbuffer_s * rb, const void *data, size_t len)
510 {
511 	char *dest = qb_rb_chunk_alloc(rb, len);
512 	int32_t res = 0;
513 
514 	if (rb == NULL) {
515 		return -EINVAL;
516 	}
517 
518 	if (dest == NULL) {
519 		return -errno;
520 	}
521 
522 	memcpy(dest, data, len);
523 
524 	res = qb_rb_chunk_commit(rb, len);
525 	if (res < 0) {
526 		return res;
527 	}
528 
529 	return len;
530 }
531 
532 static int
_rb_chunk_reclaim(struct qb_ringbuffer_s * rb)533 _rb_chunk_reclaim(struct qb_ringbuffer_s * rb)
534 {
535 	uint32_t old_read_pt;
536 	uint32_t new_read_pt;
537 	uint32_t old_chunk_size;
538 	uint32_t chunk_magic;
539 	int rc = 0;
540 
541 	old_read_pt = rb->shared_hdr->read_pt;
542 	chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, old_read_pt);
543 	if (chunk_magic != QB_RB_CHUNK_MAGIC) {
544 		errno = EINVAL;
545 		return -errno;
546 	}
547 
548 	old_chunk_size = QB_RB_CHUNK_SIZE_GET(rb, old_read_pt);
549 	new_read_pt = qb_rb_chunk_step(rb, old_read_pt);
550 
551 	/*
552 	 * clear the header
553 	 */
554 	rb->shared_data[old_read_pt] = 0;
555 	QB_RB_CHUNK_MAGIC_SET(rb, old_read_pt, QB_RB_CHUNK_MAGIC_DEAD);
556 
557 	/*
558 	 * set the new read pointer after clearing the header
559 	 * to prevent a situation where a fast writer will write their
560 	 * new chunk between setting the new read pointer and clearing the
561 	 * header.
562 	 */
563 	rb->shared_hdr->read_pt = new_read_pt;
564 
565 	if (rb->notifier.reclaim_fn) {
566 		rc = rb->notifier.reclaim_fn(rb->notifier.instance,
567 						 old_chunk_size);
568 		if (rc < 0) {
569 			errno = -rc;
570 			qb_util_perror(LOG_WARNING, "reclaim_fn");
571 		}
572 	}
573 
574 	DEBUG_PRINTF("reclaim [%zd]: read: %u -> %u, write: %u\n",
575 		     (rb->notifier.q_len_fn ?
576 		      rb->notifier.q_len_fn(rb->notifier.instance) : 0),
577 		     old_read_pt,
578 		     rb->shared_hdr->read_pt,
579 		     rb->shared_hdr->write_pt);
580 
581 	return rc;
582 }
583 
584 void
qb_rb_chunk_reclaim(struct qb_ringbuffer_s * rb)585 qb_rb_chunk_reclaim(struct qb_ringbuffer_s * rb)
586 {
587 	if (rb == NULL) {
588 		return;
589 	}
590 	_rb_chunk_reclaim(rb);
591 }
592 
593 ssize_t
qb_rb_chunk_peek(struct qb_ringbuffer_s * rb,void ** data_out,int32_t timeout)594 qb_rb_chunk_peek(struct qb_ringbuffer_s * rb, void **data_out, int32_t timeout)
595 {
596 	uint32_t read_pt;
597 	uint32_t chunk_size;
598 	uint32_t chunk_magic;
599 	int32_t res = 0;
600 
601 	if (rb == NULL) {
602 		return -EINVAL;
603 	}
604 	if (rb->notifier.timedwait_fn) {
605 		res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout);
606 	}
607 	if (res < 0 && res != -EIDRM) {
608 		if (res == -ETIMEDOUT) {
609 			return 0;
610 		} else {
611 			errno = -res;
612 			qb_util_perror(LOG_ERR, "sem_timedwait");
613 		}
614 		return res;
615 	}
616 	read_pt = rb->shared_hdr->read_pt;
617 	chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt);
618 	if (chunk_magic != QB_RB_CHUNK_MAGIC) {
619 		if (rb->notifier.post_fn) {
620 			(void)rb->notifier.post_fn(rb->notifier.instance, res);
621 		}
622 #ifdef EBADMSG
623 		return -EBADMSG;
624 #else
625 		return -EINVAL;
626 #endif
627 	}
628 	chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt);
629 	*data_out = QB_RB_CHUNK_DATA_GET(rb, read_pt);
630 	return chunk_size;
631 }
632 
633 ssize_t
qb_rb_chunk_read(struct qb_ringbuffer_s * rb,void * data_out,size_t len,int32_t timeout)634 qb_rb_chunk_read(struct qb_ringbuffer_s * rb, void *data_out, size_t len,
635 		 int32_t timeout)
636 {
637 	uint32_t read_pt;
638 	uint32_t chunk_size;
639 	uint32_t chunk_magic;
640 	int32_t res = 0;
641 
642 	if (rb == NULL) {
643 		return -EINVAL;
644 	}
645 	if (rb->notifier.timedwait_fn) {
646 		res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout);
647 	}
648 	if (res < 0 && res != -EIDRM) {
649 		if (res != -ETIMEDOUT) {
650 			errno = -res;
651 			qb_util_perror(LOG_ERR, "sem_timedwait");
652 		}
653 		return res;
654 	}
655 
656 	read_pt = rb->shared_hdr->read_pt;
657 	chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt);
658 
659 	if (chunk_magic != QB_RB_CHUNK_MAGIC) {
660 		if (rb->notifier.timedwait_fn == NULL) {
661 			return -ETIMEDOUT;
662 		} else {
663 			(void)rb->notifier.post_fn(rb->notifier.instance, res);
664 #ifdef EBADMSG
665 			return -EBADMSG;
666 #else
667 			return -EINVAL;
668 #endif
669 		}
670 	}
671 
672 	chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt);
673 	if (len < chunk_size) {
674 		qb_util_log(LOG_ERR,
675 			    "trying to recv chunk of size %d but %d available",
676 			    len, chunk_size);
677 		if (rb->notifier.post_fn) {
678 			(void)rb->notifier.post_fn(rb->notifier.instance, chunk_size);
679 		}
680 		return -ENOBUFS;
681 	}
682 
683 	memcpy(data_out,
684 	       QB_RB_CHUNK_DATA_GET(rb, read_pt),
685 	       chunk_size);
686 
687 	_rb_chunk_reclaim(rb);
688 
689 	return chunk_size;
690 }
691 
692 static void
print_header(struct qb_ringbuffer_s * rb)693 print_header(struct qb_ringbuffer_s * rb)
694 {
695 	printf("Ringbuffer: \n");
696 	if (rb->flags & QB_RB_FLAG_OVERWRITE) {
697 		printf(" ->OVERWRITE\n");
698 	} else {
699 		printf(" ->NORMAL\n");
700 	}
701 #ifndef S_SPLINT_S
702 	printf(" ->write_pt [%" PRIu32 "]\n", rb->shared_hdr->write_pt);
703 	printf(" ->read_pt [%" PRIu32 "]\n", rb->shared_hdr->read_pt);
704 	printf(" ->size [%" PRIu32 " words]\n", rb->shared_hdr->word_size);
705 	printf(" =>free [%zd bytes]\n", qb_rb_space_free(rb));
706 	printf(" =>used [%zd bytes]\n", qb_rb_space_used(rb));
707 #endif /* S_SPLINT_S */
708 }
709 
710 /*
711  * FILE HEADER ORDER
712  * 1. word_size
713  * 2. write_pt
714  * 3. read_pt
715  * 4. version
716  * 5. header_hash
717  *
718  * 6. data
719  */
720 
721 ssize_t
qb_rb_write_to_file(struct qb_ringbuffer_s * rb,int32_t fd)722 qb_rb_write_to_file(struct qb_ringbuffer_s * rb, int32_t fd)
723 {
724 	ssize_t result;
725 	ssize_t written_size = 0;
726 	uint32_t hash = 0;
727 	uint32_t version = QB_RB_FILE_HEADER_VERSION;
728 
729 	if (rb == NULL) {
730 		return -EINVAL;
731 	}
732 	print_header(rb);
733 
734 	/*
735  	 * 1. word_size
736  	 */
737 	result = write(fd, &rb->shared_hdr->word_size, sizeof(uint32_t));
738 	if (result != sizeof(uint32_t)) {
739 		return -errno;
740 	}
741 	written_size += result;
742 
743 	/*
744 	 * 2. 3. store the read & write pointers
745 	 */
746 	result = write(fd, (void *)&rb->shared_hdr->write_pt, sizeof(uint32_t));
747 	if (result != sizeof(uint32_t)) {
748 		return -errno;
749 	}
750 	written_size += result;
751 	result = write(fd, (void *)&rb->shared_hdr->read_pt, sizeof(uint32_t));
752 	if (result != sizeof(uint32_t)) {
753 		return -errno;
754 	}
755 	written_size += result;
756 
757 	/*
758 	 * 4. version used
759 	 */
760 	result = write(fd, &version, sizeof(uint32_t));
761 	if (result != sizeof(uint32_t)) {
762 		return -errno;
763 	}
764 	written_size += result;
765 
766 	/*
767 	 * 5. hash helps us verify header is not corrupted on file read
768 	 */
769 	hash = rb->shared_hdr->word_size + rb->shared_hdr->write_pt + rb->shared_hdr->read_pt + QB_RB_FILE_HEADER_VERSION;
770 	result = write(fd, &hash, sizeof(uint32_t));
771 	if (result != sizeof(uint32_t)) {
772 		return -errno;
773 	}
774 	written_size += result;
775 
776 	result = write(fd, rb->shared_data,
777 		       rb->shared_hdr->word_size * sizeof(uint32_t));
778 	if (result != rb->shared_hdr->word_size * sizeof(uint32_t)) {
779 		return -errno;
780 	}
781 	written_size += result;
782 
783 	qb_util_log(LOG_DEBUG, " writing total of: %zd\n", written_size);
784 
785 	return written_size;
786 }
787 
788 qb_ringbuffer_t *
qb_rb_create_from_file(int32_t fd,uint32_t flags)789 qb_rb_create_from_file(int32_t fd, uint32_t flags)
790 {
791 	ssize_t n_read;
792 	size_t n_required;
793 	size_t total_read = 0;
794 	uint32_t read_pt;
795 	uint32_t write_pt;
796 	struct qb_ringbuffer_s *rb;
797 	uint32_t word_size = 0;
798 	uint32_t version = 0;
799 	uint32_t hash = 0;
800 	uint32_t calculated_hash = 0;
801 
802 	if (fd < 0) {
803 		return NULL;
804 	}
805 
806 	/*
807 	 * 1. word size
808 	 */
809 	n_required = sizeof(uint32_t);
810 	n_read = read(fd, &word_size, n_required);
811 	if (n_read != n_required) {
812 		qb_util_perror(LOG_ERR, "Unable to read blackbox file header");
813 		return NULL;
814 	}
815 	total_read += n_read;
816 
817 	/*
818 	 * 2. 3. read & write pointers
819 	 */
820 	n_read = read(fd, &write_pt, sizeof(uint32_t));
821 	assert(n_read == sizeof(uint32_t));
822 	total_read += n_read;
823 
824 	n_read = read(fd, &read_pt, sizeof(uint32_t));
825 	assert(n_read == sizeof(uint32_t));
826 	total_read += n_read;
827 
828 	/*
829 	 * 4. version
830 	 */
831 	n_required = sizeof(uint32_t);
832 	n_read = read(fd, &version, n_required);
833 	if (n_read != n_required) {
834 		qb_util_perror(LOG_ERR, "Unable to read blackbox file header");
835 		return NULL;
836 	}
837 	total_read += n_read;
838 
839 	/*
840 	 * 5. Hash
841 	 */
842 	n_required = sizeof(uint32_t);
843 	n_read = read(fd, &hash, n_required);
844 	if (n_read != n_required) {
845 		qb_util_perror(LOG_ERR, "Unable to read blackbox file header");
846 		return NULL;
847 	}
848 	total_read += n_read;
849 
850 	calculated_hash = word_size + write_pt + read_pt + version;
851 	if (hash != calculated_hash) {
852 		qb_util_log(LOG_ERR, "Corrupt blackbox: File header hash (%d) does not match calculated hash (%d)", hash, calculated_hash);
853 		return NULL;
854 	} else if (version != QB_RB_FILE_HEADER_VERSION) {
855 		qb_util_log(LOG_ERR, "Wrong file header version. Expected %d got %d",
856 			QB_RB_FILE_HEADER_VERSION, version);
857 		return NULL;
858 	}
859 
860 	/*
861 	 * 6. data
862 	 */
863 	n_required = (word_size * sizeof(uint32_t));
864 
865 	/*
866 	 * qb_rb_open adds QB_RB_CHUNK_MARGIN + 1 to the requested size.
867 	 */
868 	rb = qb_rb_open("create_from_file", n_required - (QB_RB_CHUNK_MARGIN + 1),
869 			QB_RB_FLAG_CREATE | QB_RB_FLAG_NO_SEMAPHORE, 0);
870 	if (rb == NULL) {
871 		return NULL;
872 	}
873 	rb->shared_hdr->read_pt = read_pt;
874 	rb->shared_hdr->write_pt = write_pt;
875 
876 	n_read = read(fd, rb->shared_data, n_required);
877 	if (n_read < 0) {
878 		qb_util_perror(LOG_ERR, "Unable to read blackbox file data");
879 		goto cleanup_fail;
880 	}
881 	total_read += n_read;
882 
883 	if (n_read != n_required) {
884 		qb_util_log(LOG_WARNING, "read %zd bytes, but expected %zu",
885 			    n_read, n_required);
886 		goto cleanup_fail;
887 	}
888 
889 	qb_util_log(LOG_DEBUG, "read total of: %zd", total_read);
890 	print_header(rb);
891 
892 	return rb;
893 
894 cleanup_fail:
895 	qb_rb_close(rb);
896 	return NULL;
897 }
898 
899 int32_t
qb_rb_chown(struct qb_ringbuffer_s * rb,uid_t owner,gid_t group)900 qb_rb_chown(struct qb_ringbuffer_s * rb, uid_t owner, gid_t group)
901 {
902 	int32_t res;
903 
904 	if (rb == NULL) {
905 		return -EINVAL;
906 	}
907 	res = chown(rb->shared_hdr->data_path, owner, group);
908 	if (res < 0 && errno != EPERM) {
909 		return -errno;
910 	}
911 	res = chown(rb->shared_hdr->hdr_path, owner, group);
912 	if (res < 0 && errno != EPERM) {
913 		return -errno;
914 	}
915 	return 0;
916 }
917 
918 int32_t
qb_rb_chmod(qb_ringbuffer_t * rb,mode_t mode)919 qb_rb_chmod(qb_ringbuffer_t * rb, mode_t mode)
920 {
921 	int32_t res;
922 
923 	if (rb == NULL) {
924 		return -EINVAL;
925 	}
926 	res = chmod(rb->shared_hdr->data_path, mode);
927 	if (res < 0) {
928 		return -errno;
929 	}
930 	res = chmod(rb->shared_hdr->hdr_path, mode);
931 	if (res < 0) {
932 		return -errno;
933 	}
934 	return 0;
935 }
936