1 /*
2  * Copyright (c) 2014 Intel Corporation.  All rights reserved.
3  * Copyright (c) 2016 Cisco Systems, Inc.  All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the
9  * BSD license below:
10  *
11  *     Redistribution and use in source and binary forms, with or
12  *     without modification, are permitted provided that the following
13  *     conditions are met:
14  *
15  *      - Redistributions of source code must retain the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer.
18  *
19  *      - Redistributions in binary form must reproduce the above
20  *        copyright notice, this list of conditions and the following
21  *        disclaimer in the documentation and/or other materials
22  *        provided with the distribution.
23  *
24  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31  * SOFTWARE.
32  *
33  */
34 
35 #ifndef _OFI_RBUF_H_
36 #define _OFI_RBUF_H_
37 
38 #include "config.h"
39 
40 #include <assert.h>
41 #include <stdlib.h>
42 #include <sys/types.h>
43 #include <sys/socket.h>
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <ofi.h>
47 #include <ofi_file.h>
48 #include <stdlib.h>
49 
50 
51 /*
52  * Circular queue/array template
53  */
54 #define OFI_DECLARE_CIRQUE(entrytype, name)                     \
55 struct name {							\
56 	size_t		size;					\
57 	size_t		size_mask;				\
58 	size_t		rcnt;					\
59 	size_t		wcnt;					\
60 	entrytype	buf[];					\
61 };								\
62 								\
63 static inline void name ## _init(struct name *cq, size_t size)	\
64 {								\
65 	assert(size == roundup_power_of_two(size));		\
66 	cq->size = size;					\
67 	cq->size_mask = cq->size - 1;				\
68 	cq->rcnt = 0;						\
69 	cq->wcnt = 0;						\
70 }								\
71 								\
72 static inline struct name * name ## _create(size_t size)	\
73 {								\
74 	struct name *cq;					\
75 	cq = (struct name*) calloc(1, sizeof(*cq) + sizeof(entrytype) *	\
76 		    (roundup_power_of_two(size)));		\
77 	if (cq)							\
78 		name ##_init(cq, roundup_power_of_two(size));	\
79 	return cq;						\
80 }								\
81 								\
82 static inline void name ## _free(struct name *cq)		\
83 {								\
84 	free(cq);						\
85 }
86 
87 #define ofi_cirque_isempty(cq)		((cq)->wcnt == (cq)->rcnt)
88 #define ofi_cirque_usedcnt(cq)		((cq)->wcnt - (cq)->rcnt)
89 #define ofi_cirque_freecnt(cq)		((cq)->size - ofi_cirque_usedcnt(cq))
90 #define ofi_cirque_isfull(cq)		(ofi_cirque_freecnt(cq) <= 0)
91 
92 #define ofi_cirque_rindex(cq)		((cq)->rcnt & (cq)->size_mask)
93 #define ofi_cirque_windex(cq)		((cq)->wcnt & (cq)->size_mask)
94 #define ofi_cirque_head(cq)		(&(cq)->buf[ofi_cirque_rindex(cq)])
95 #define ofi_cirque_tail(cq)		(&(cq)->buf[ofi_cirque_windex(cq)])
96 #define ofi_cirque_insert(cq, x)	(cq)->buf[(cq)->wcnt++ & (cq)->size_mask] = x
97 #define ofi_cirque_remove(cq)		(&(cq)->buf[(cq)->rcnt++ & (cq)->size_mask])
98 #define ofi_cirque_discard(cq)		((cq)->rcnt++)
99 #define ofi_cirque_commit(cq)		((cq)->wcnt++)
100 
101 
102 /*
103  * Simple ring buffer
104  */
105 struct ofi_ringbuf {
106 	size_t		size;
107 	size_t		size_mask;
108 	size_t		rcnt;
109 	size_t		wcnt;
110 	size_t		wpos;
111 	void		*buf;
112 };
113 
ofi_rbinit(struct ofi_ringbuf * rb,size_t size)114 static inline int ofi_rbinit(struct ofi_ringbuf *rb, size_t size)
115 {
116 	rb->size = roundup_power_of_two(size);
117 	rb->size_mask = rb->size - 1;
118 	rb->rcnt = 0;
119 	rb->wcnt = 0;
120 	rb->wpos = 0;
121 	rb->buf = calloc(1, rb->size);
122 	if (!rb->buf)
123 		return -ENOMEM;
124 	return 0;
125 }
126 
ofi_rbreset(struct ofi_ringbuf * rb)127 static inline void ofi_rbreset(struct ofi_ringbuf *rb)
128 {
129 	rb->rcnt = 0;
130 	rb->wcnt = 0;
131 	rb->wpos = 0;
132 }
133 
ofi_rbfree(struct ofi_ringbuf * rb)134 static inline void ofi_rbfree(struct ofi_ringbuf *rb)
135 {
136 	free(rb->buf);
137 }
138 
ofi_rbfull(struct ofi_ringbuf * rb)139 static inline int ofi_rbfull(struct ofi_ringbuf *rb)
140 {
141 	return rb->wcnt - rb->rcnt >= rb->size;
142 }
143 
ofi_rbempty(struct ofi_ringbuf * rb)144 static inline int ofi_rbempty(struct ofi_ringbuf *rb)
145 {
146 	return rb->wcnt == rb->rcnt;
147 }
148 
ofi_rbused(struct ofi_ringbuf * rb)149 static inline size_t ofi_rbused(struct ofi_ringbuf *rb)
150 {
151 	return rb->wcnt - rb->rcnt;
152 }
153 
ofi_rbavail(struct ofi_ringbuf * rb)154 static inline size_t ofi_rbavail(struct ofi_ringbuf *rb)
155 {
156 	return rb->size - ofi_rbused(rb);
157 }
158 
ofi_rbwrite(struct ofi_ringbuf * rb,const void * buf,size_t len)159 static inline void ofi_rbwrite(struct ofi_ringbuf *rb, const void *buf, size_t len)
160 {
161 	size_t endlen;
162 
163 	endlen = rb->size - (rb->wpos & rb->size_mask);
164 	if (len <= endlen) {
165 		memcpy((char*)rb->buf + (rb->wpos & rb->size_mask), buf, len);
166 	} else {
167 		memcpy((char*)rb->buf + (rb->wpos & rb->size_mask), buf, endlen);
168 		memcpy(rb->buf, (char*)buf + endlen, len - endlen);
169 	}
170 	rb->wpos += len;
171 }
172 
ofi_rbcommit(struct ofi_ringbuf * rb)173 static inline void ofi_rbcommit(struct ofi_ringbuf *rb)
174 {
175 	rb->wcnt = rb->wpos;
176 }
177 
ofi_rbabort(struct ofi_ringbuf * rb)178 static inline void ofi_rbabort(struct ofi_ringbuf *rb)
179 {
180 	rb->wpos = rb->wcnt;
181 }
182 
ofi_rbpeek(struct ofi_ringbuf * rb,void * buf,size_t len)183 static inline void ofi_rbpeek(struct ofi_ringbuf *rb, void *buf, size_t len)
184 {
185 	size_t endlen;
186 
187 	endlen = rb->size - (rb->rcnt & rb->size_mask);
188 	if (len <= endlen) {
189 		memcpy(buf, (char*)rb->buf + (rb->rcnt & rb->size_mask), len);
190 	} else {
191 		memcpy(buf, (char*)rb->buf + (rb->rcnt & rb->size_mask), endlen);
192 		memcpy((char*)buf + endlen, rb->buf, len - endlen);
193 	}
194 }
195 
ofi_rbread(struct ofi_ringbuf * rb,void * buf,size_t len)196 static inline void ofi_rbread(struct ofi_ringbuf *rb, void *buf, size_t len)
197 {
198 	ofi_rbpeek(rb, buf, len);
199 	rb->rcnt += len;
200 }
201 
ofi_rbdiscard(struct ofi_ringbuf * rb,size_t len)202 static inline size_t ofi_rbdiscard(struct ofi_ringbuf *rb, size_t len)
203 {
204 	size_t used_len = MIN(ofi_rbused(rb), len);
205 	rb->rcnt += used_len;
206 	return used_len;
207 }
208 
209 /*
210  * Ring buffer with blocking read support using an fd
211  */
212 enum {
213 	OFI_RB_READ_FD,
214 	OFI_RB_WRITE_FD
215 };
216 
217 struct ofi_ringbuffd {
218 	struct ofi_ringbuf	rb;
219 	int			fdrcnt;
220 	int			fdwcnt;
221 	int			fd[2];
222 };
223 
ofi_rbfdinit(struct ofi_ringbuffd * rbfd,size_t size)224 static inline int ofi_rbfdinit(struct ofi_ringbuffd *rbfd, size_t size)
225 {
226 	int ret;
227 
228 	rbfd->fdrcnt = 0;
229 	rbfd->fdwcnt = 0;
230 	ret = ofi_rbinit(&rbfd->rb, size);
231 	if (ret)
232 		return ret;
233 
234 	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, rbfd->fd);
235 	if (ret < 0) {
236 		ret = -ofi_sockerr();
237 		goto err1;
238 	}
239 
240 	ret = fi_fd_nonblock(rbfd->fd[OFI_RB_READ_FD]);
241 	if (ret)
242 		goto err2;
243 
244 	return 0;
245 
246 err2:
247 	ofi_close_socket(rbfd->fd[0]);
248 	ofi_close_socket(rbfd->fd[1]);
249 err1:
250 	ofi_rbfree(&rbfd->rb);
251 	return ret;
252 }
253 
ofi_rbfdfree(struct ofi_ringbuffd * rbfd)254 static inline void ofi_rbfdfree(struct ofi_ringbuffd *rbfd)
255 {
256 	ofi_rbfree(&rbfd->rb);
257 	ofi_close_socket(rbfd->fd[0]);
258 	ofi_close_socket(rbfd->fd[1]);
259 }
260 
ofi_rbfdfull(struct ofi_ringbuffd * rbfd)261 static inline int ofi_rbfdfull(struct ofi_ringbuffd *rbfd)
262 {
263 	return ofi_rbfull(&rbfd->rb);
264 }
265 
ofi_rbfdempty(struct ofi_ringbuffd * rbfd)266 static inline int ofi_rbfdempty(struct ofi_ringbuffd *rbfd)
267 {
268 	return ofi_rbempty(&rbfd->rb);
269 }
270 
ofi_rbfdused(struct ofi_ringbuffd * rbfd)271 static inline size_t ofi_rbfdused(struct ofi_ringbuffd *rbfd)
272 {
273 	return ofi_rbused(&rbfd->rb);
274 }
275 
ofi_rbfdavail(struct ofi_ringbuffd * rbfd)276 static inline size_t ofi_rbfdavail(struct ofi_ringbuffd *rbfd)
277 {
278 	return ofi_rbavail(&rbfd->rb);
279 }
280 
ofi_rbfdsignal(struct ofi_ringbuffd * rbfd)281 static inline void ofi_rbfdsignal(struct ofi_ringbuffd *rbfd)
282 {
283 	char c = 0;
284 	if (rbfd->fdwcnt == rbfd->fdrcnt) {
285 		if (ofi_write_socket(rbfd->fd[OFI_RB_WRITE_FD], &c, sizeof c) == sizeof c)
286 			rbfd->fdwcnt++;
287 	}
288 }
289 
ofi_rbfdreset(struct ofi_ringbuffd * rbfd)290 static inline void ofi_rbfdreset(struct ofi_ringbuffd *rbfd)
291 {
292 	char c;
293 
294 	if (ofi_rbfdempty(rbfd) && (rbfd->fdrcnt != rbfd->fdwcnt)) {
295 		if (ofi_read_socket(rbfd->fd[OFI_RB_READ_FD], &c, sizeof c) == sizeof c)
296 			rbfd->fdrcnt++;
297 	}
298 }
299 
ofi_rbfdwrite(struct ofi_ringbuffd * rbfd,const void * buf,size_t len)300 static inline void ofi_rbfdwrite(struct ofi_ringbuffd *rbfd, const void *buf, size_t len)
301 {
302 	ofi_rbwrite(&rbfd->rb, buf, len);
303 }
304 
ofi_rbfdcommit(struct ofi_ringbuffd * rbfd)305 static inline void ofi_rbfdcommit(struct ofi_ringbuffd *rbfd)
306 {
307 	ofi_rbcommit(&rbfd->rb);
308 	ofi_rbfdsignal(rbfd);
309 }
310 
ofi_rbfdabort(struct ofi_ringbuffd * rbfd)311 static inline void ofi_rbfdabort(struct ofi_ringbuffd *rbfd)
312 {
313 	ofi_rbabort(&rbfd->rb);
314 }
315 
ofi_rbfdpeek(struct ofi_ringbuffd * rbfd,void * buf,size_t len)316 static inline void ofi_rbfdpeek(struct ofi_ringbuffd *rbfd, void *buf, size_t len)
317 {
318 	ofi_rbpeek(&rbfd->rb, buf, len);
319 }
320 
ofi_rbfdread(struct ofi_ringbuffd * rbfd,void * buf,size_t len)321 static inline void ofi_rbfdread(struct ofi_ringbuffd *rbfd, void *buf, size_t len)
322 {
323 	ofi_rbread(&rbfd->rb, buf, len);
324 	ofi_rbfdreset(rbfd);
325 }
326 
ofi_rbfdsread(struct ofi_ringbuffd * rbfd,void * buf,size_t len,int timeout)327 static inline size_t ofi_rbfdsread(struct ofi_ringbuffd *rbfd, void *buf, size_t len,
328 				int timeout)
329 {
330 	int ret;
331 	size_t avail;
332 
333 	avail = ofi_rbfdused(rbfd);
334 	if (avail) {
335 		len = MIN(len, avail);
336 		ofi_rbfdread(rbfd, buf, len);
337 		return len;
338 	}
339 
340 	ret = fi_poll_fd(rbfd->fd[OFI_RB_READ_FD], timeout);
341 	if (ret == 1) {
342 		len = MIN(len, ofi_rbfdused(rbfd));
343 		ofi_rbfdread(rbfd, buf, len);
344 		return len;
345 	}
346 	return ret;
347 }
348 
ofi_rbfdwait(struct ofi_ringbuffd * rbfd,int timeout)349 static inline size_t ofi_rbfdwait(struct ofi_ringbuffd *rbfd, int timeout)
350 {
351 	return  fi_poll_fd(rbfd->fd[OFI_RB_READ_FD], timeout);
352 }
353 
354 
355 #endif /* _OFI_RBUF_H_ */
356