1 /*
2 * Copyright (c) 2014 Intel Corporation. All rights reserved.
3 * Copyright (c) 2016 Cisco Systems, Inc. All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses. You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the
9 * BSD license below:
10 *
11 * Redistribution and use in source and binary forms, with or
12 * without modification, are permitted provided that the following
13 * conditions are met:
14 *
15 * - Redistributions of source code must retain the above
16 * copyright notice, this list of conditions and the following
17 * disclaimer.
18 *
19 * - Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials
22 * provided with the distribution.
23 *
24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31 * SOFTWARE.
32 *
33 */
34
35 #ifndef _OFI_RBUF_H_
36 #define _OFI_RBUF_H_
37
38 #include "config.h"
39
40 #include <assert.h>
41 #include <stdlib.h>
42 #include <sys/types.h>
43 #include <sys/socket.h>
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <ofi.h>
47 #include <ofi_file.h>
48 #include <stdlib.h>
49
50
51 /*
52 * Circular queue/array template
53 */
54 #define OFI_DECLARE_CIRQUE(entrytype, name) \
55 struct name { \
56 size_t size; \
57 size_t size_mask; \
58 size_t rcnt; \
59 size_t wcnt; \
60 entrytype buf[]; \
61 }; \
62 \
63 static inline void name ## _init(struct name *cq, size_t size) \
64 { \
65 assert(size == roundup_power_of_two(size)); \
66 cq->size = size; \
67 cq->size_mask = cq->size - 1; \
68 cq->rcnt = 0; \
69 cq->wcnt = 0; \
70 } \
71 \
72 static inline struct name * name ## _create(size_t size) \
73 { \
74 struct name *cq; \
75 cq = (struct name*) calloc(1, sizeof(*cq) + sizeof(entrytype) * \
76 (roundup_power_of_two(size))); \
77 if (cq) \
78 name ##_init(cq, roundup_power_of_two(size)); \
79 return cq; \
80 } \
81 \
82 static inline void name ## _free(struct name *cq) \
83 { \
84 free(cq); \
85 }
86
87 #define ofi_cirque_isempty(cq) ((cq)->wcnt == (cq)->rcnt)
88 #define ofi_cirque_usedcnt(cq) ((cq)->wcnt - (cq)->rcnt)
89 #define ofi_cirque_freecnt(cq) ((cq)->size - ofi_cirque_usedcnt(cq))
90 #define ofi_cirque_isfull(cq) (ofi_cirque_freecnt(cq) <= 0)
91
92 #define ofi_cirque_rindex(cq) ((cq)->rcnt & (cq)->size_mask)
93 #define ofi_cirque_windex(cq) ((cq)->wcnt & (cq)->size_mask)
94 #define ofi_cirque_head(cq) (&(cq)->buf[ofi_cirque_rindex(cq)])
95 #define ofi_cirque_tail(cq) (&(cq)->buf[ofi_cirque_windex(cq)])
96 #define ofi_cirque_insert(cq, x) (cq)->buf[(cq)->wcnt++ & (cq)->size_mask] = x
97 #define ofi_cirque_remove(cq) (&(cq)->buf[(cq)->rcnt++ & (cq)->size_mask])
98 #define ofi_cirque_discard(cq) ((cq)->rcnt++)
99 #define ofi_cirque_commit(cq) ((cq)->wcnt++)
100
101
102 /*
103 * Simple ring buffer
104 */
105 struct ofi_ringbuf {
106 size_t size;
107 size_t size_mask;
108 size_t rcnt;
109 size_t wcnt;
110 size_t wpos;
111 void *buf;
112 };
113
ofi_rbinit(struct ofi_ringbuf * rb,size_t size)114 static inline int ofi_rbinit(struct ofi_ringbuf *rb, size_t size)
115 {
116 rb->size = roundup_power_of_two(size);
117 rb->size_mask = rb->size - 1;
118 rb->rcnt = 0;
119 rb->wcnt = 0;
120 rb->wpos = 0;
121 rb->buf = calloc(1, rb->size);
122 if (!rb->buf)
123 return -ENOMEM;
124 return 0;
125 }
126
ofi_rbreset(struct ofi_ringbuf * rb)127 static inline void ofi_rbreset(struct ofi_ringbuf *rb)
128 {
129 rb->rcnt = 0;
130 rb->wcnt = 0;
131 rb->wpos = 0;
132 }
133
ofi_rbfree(struct ofi_ringbuf * rb)134 static inline void ofi_rbfree(struct ofi_ringbuf *rb)
135 {
136 free(rb->buf);
137 }
138
ofi_rbfull(struct ofi_ringbuf * rb)139 static inline int ofi_rbfull(struct ofi_ringbuf *rb)
140 {
141 return rb->wcnt - rb->rcnt >= rb->size;
142 }
143
ofi_rbempty(struct ofi_ringbuf * rb)144 static inline int ofi_rbempty(struct ofi_ringbuf *rb)
145 {
146 return rb->wcnt == rb->rcnt;
147 }
148
ofi_rbused(struct ofi_ringbuf * rb)149 static inline size_t ofi_rbused(struct ofi_ringbuf *rb)
150 {
151 return rb->wcnt - rb->rcnt;
152 }
153
ofi_rbavail(struct ofi_ringbuf * rb)154 static inline size_t ofi_rbavail(struct ofi_ringbuf *rb)
155 {
156 return rb->size - ofi_rbused(rb);
157 }
158
ofi_rbwrite(struct ofi_ringbuf * rb,const void * buf,size_t len)159 static inline void ofi_rbwrite(struct ofi_ringbuf *rb, const void *buf, size_t len)
160 {
161 size_t endlen;
162
163 endlen = rb->size - (rb->wpos & rb->size_mask);
164 if (len <= endlen) {
165 memcpy((char*)rb->buf + (rb->wpos & rb->size_mask), buf, len);
166 } else {
167 memcpy((char*)rb->buf + (rb->wpos & rb->size_mask), buf, endlen);
168 memcpy(rb->buf, (char*)buf + endlen, len - endlen);
169 }
170 rb->wpos += len;
171 }
172
ofi_rbcommit(struct ofi_ringbuf * rb)173 static inline void ofi_rbcommit(struct ofi_ringbuf *rb)
174 {
175 rb->wcnt = rb->wpos;
176 }
177
ofi_rbabort(struct ofi_ringbuf * rb)178 static inline void ofi_rbabort(struct ofi_ringbuf *rb)
179 {
180 rb->wpos = rb->wcnt;
181 }
182
ofi_rbpeek(struct ofi_ringbuf * rb,void * buf,size_t len)183 static inline void ofi_rbpeek(struct ofi_ringbuf *rb, void *buf, size_t len)
184 {
185 size_t endlen;
186
187 endlen = rb->size - (rb->rcnt & rb->size_mask);
188 if (len <= endlen) {
189 memcpy(buf, (char*)rb->buf + (rb->rcnt & rb->size_mask), len);
190 } else {
191 memcpy(buf, (char*)rb->buf + (rb->rcnt & rb->size_mask), endlen);
192 memcpy((char*)buf + endlen, rb->buf, len - endlen);
193 }
194 }
195
ofi_rbread(struct ofi_ringbuf * rb,void * buf,size_t len)196 static inline void ofi_rbread(struct ofi_ringbuf *rb, void *buf, size_t len)
197 {
198 ofi_rbpeek(rb, buf, len);
199 rb->rcnt += len;
200 }
201
ofi_rbdiscard(struct ofi_ringbuf * rb,size_t len)202 static inline size_t ofi_rbdiscard(struct ofi_ringbuf *rb, size_t len)
203 {
204 size_t used_len = MIN(ofi_rbused(rb), len);
205 rb->rcnt += used_len;
206 return used_len;
207 }
208
209 /*
210 * Ring buffer with blocking read support using an fd
211 */
212 enum {
213 OFI_RB_READ_FD,
214 OFI_RB_WRITE_FD
215 };
216
217 struct ofi_ringbuffd {
218 struct ofi_ringbuf rb;
219 int fdrcnt;
220 int fdwcnt;
221 int fd[2];
222 };
223
ofi_rbfdinit(struct ofi_ringbuffd * rbfd,size_t size)224 static inline int ofi_rbfdinit(struct ofi_ringbuffd *rbfd, size_t size)
225 {
226 int ret;
227
228 rbfd->fdrcnt = 0;
229 rbfd->fdwcnt = 0;
230 ret = ofi_rbinit(&rbfd->rb, size);
231 if (ret)
232 return ret;
233
234 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, rbfd->fd);
235 if (ret < 0) {
236 ret = -ofi_sockerr();
237 goto err1;
238 }
239
240 ret = fi_fd_nonblock(rbfd->fd[OFI_RB_READ_FD]);
241 if (ret)
242 goto err2;
243
244 return 0;
245
246 err2:
247 ofi_close_socket(rbfd->fd[0]);
248 ofi_close_socket(rbfd->fd[1]);
249 err1:
250 ofi_rbfree(&rbfd->rb);
251 return ret;
252 }
253
ofi_rbfdfree(struct ofi_ringbuffd * rbfd)254 static inline void ofi_rbfdfree(struct ofi_ringbuffd *rbfd)
255 {
256 ofi_rbfree(&rbfd->rb);
257 ofi_close_socket(rbfd->fd[0]);
258 ofi_close_socket(rbfd->fd[1]);
259 }
260
ofi_rbfdfull(struct ofi_ringbuffd * rbfd)261 static inline int ofi_rbfdfull(struct ofi_ringbuffd *rbfd)
262 {
263 return ofi_rbfull(&rbfd->rb);
264 }
265
ofi_rbfdempty(struct ofi_ringbuffd * rbfd)266 static inline int ofi_rbfdempty(struct ofi_ringbuffd *rbfd)
267 {
268 return ofi_rbempty(&rbfd->rb);
269 }
270
ofi_rbfdused(struct ofi_ringbuffd * rbfd)271 static inline size_t ofi_rbfdused(struct ofi_ringbuffd *rbfd)
272 {
273 return ofi_rbused(&rbfd->rb);
274 }
275
ofi_rbfdavail(struct ofi_ringbuffd * rbfd)276 static inline size_t ofi_rbfdavail(struct ofi_ringbuffd *rbfd)
277 {
278 return ofi_rbavail(&rbfd->rb);
279 }
280
ofi_rbfdsignal(struct ofi_ringbuffd * rbfd)281 static inline void ofi_rbfdsignal(struct ofi_ringbuffd *rbfd)
282 {
283 char c = 0;
284 if (rbfd->fdwcnt == rbfd->fdrcnt) {
285 if (ofi_write_socket(rbfd->fd[OFI_RB_WRITE_FD], &c, sizeof c) == sizeof c)
286 rbfd->fdwcnt++;
287 }
288 }
289
ofi_rbfdreset(struct ofi_ringbuffd * rbfd)290 static inline void ofi_rbfdreset(struct ofi_ringbuffd *rbfd)
291 {
292 char c;
293
294 if (ofi_rbfdempty(rbfd) && (rbfd->fdrcnt != rbfd->fdwcnt)) {
295 if (ofi_read_socket(rbfd->fd[OFI_RB_READ_FD], &c, sizeof c) == sizeof c)
296 rbfd->fdrcnt++;
297 }
298 }
299
ofi_rbfdwrite(struct ofi_ringbuffd * rbfd,const void * buf,size_t len)300 static inline void ofi_rbfdwrite(struct ofi_ringbuffd *rbfd, const void *buf, size_t len)
301 {
302 ofi_rbwrite(&rbfd->rb, buf, len);
303 }
304
ofi_rbfdcommit(struct ofi_ringbuffd * rbfd)305 static inline void ofi_rbfdcommit(struct ofi_ringbuffd *rbfd)
306 {
307 ofi_rbcommit(&rbfd->rb);
308 ofi_rbfdsignal(rbfd);
309 }
310
ofi_rbfdabort(struct ofi_ringbuffd * rbfd)311 static inline void ofi_rbfdabort(struct ofi_ringbuffd *rbfd)
312 {
313 ofi_rbabort(&rbfd->rb);
314 }
315
ofi_rbfdpeek(struct ofi_ringbuffd * rbfd,void * buf,size_t len)316 static inline void ofi_rbfdpeek(struct ofi_ringbuffd *rbfd, void *buf, size_t len)
317 {
318 ofi_rbpeek(&rbfd->rb, buf, len);
319 }
320
ofi_rbfdread(struct ofi_ringbuffd * rbfd,void * buf,size_t len)321 static inline void ofi_rbfdread(struct ofi_ringbuffd *rbfd, void *buf, size_t len)
322 {
323 ofi_rbread(&rbfd->rb, buf, len);
324 ofi_rbfdreset(rbfd);
325 }
326
ofi_rbfdsread(struct ofi_ringbuffd * rbfd,void * buf,size_t len,int timeout)327 static inline size_t ofi_rbfdsread(struct ofi_ringbuffd *rbfd, void *buf, size_t len,
328 int timeout)
329 {
330 int ret;
331 size_t avail;
332
333 avail = ofi_rbfdused(rbfd);
334 if (avail) {
335 len = MIN(len, avail);
336 ofi_rbfdread(rbfd, buf, len);
337 return len;
338 }
339
340 ret = fi_poll_fd(rbfd->fd[OFI_RB_READ_FD], timeout);
341 if (ret == 1) {
342 len = MIN(len, ofi_rbfdused(rbfd));
343 ofi_rbfdread(rbfd, buf, len);
344 return len;
345 }
346 return ret;
347 }
348
ofi_rbfdwait(struct ofi_ringbuffd * rbfd,int timeout)349 static inline size_t ofi_rbfdwait(struct ofi_ringbuffd *rbfd, int timeout)
350 {
351 return fi_poll_fd(rbfd->fd[OFI_RB_READ_FD], timeout);
352 }
353
354
355 #endif /* _OFI_RBUF_H_ */
356