xref: /freebsd/contrib/ofed/librdmacm/rsocket.c (revision e0c4386e)
1 /*
2  * Copyright (c) 2008-2014 Intel Corporation.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  *
32  */
33 #define _GNU_SOURCE
34 #include <config.h>
35 
36 #include <sys/types.h>
37 #include <sys/socket.h>
38 #include <sys/time.h>
39 #include <infiniband/endian.h>
40 #include <stdarg.h>
41 #include <netdb.h>
42 #include <unistd.h>
43 #include <fcntl.h>
44 #include <stdio.h>
45 #include <stddef.h>
46 #include <string.h>
47 #include <netinet/tcp.h>
48 #include <sys/epoll.h>
49 #include <search.h>
50 #include <byteswap.h>
51 #include <util/compiler.h>
52 
53 #include <rdma/rdma_cma.h>
54 #include <rdma/rdma_verbs.h>
55 #include <rdma/rsocket.h>
56 #include "cma.h"
57 #include "indexer.h"
58 
59 #define RS_OLAP_START_SIZE 2048
60 #define RS_MAX_TRANSFER 65536
61 #define RS_SNDLOWAT 2048
62 #define RS_QP_MIN_SIZE 16
63 #define RS_QP_MAX_SIZE 0xFFFE
64 #define RS_QP_CTRL_SIZE 4	/* must be power of 2 */
65 #define RS_CONN_RETRIES 6
66 #define RS_SGL_SIZE 2
67 static struct index_map idm;
68 static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
69 
70 struct rsocket;
71 
72 enum {
73 	RS_SVC_NOOP,
74 	RS_SVC_ADD_DGRAM,
75 	RS_SVC_REM_DGRAM,
76 	RS_SVC_ADD_KEEPALIVE,
77 	RS_SVC_REM_KEEPALIVE,
78 	RS_SVC_MOD_KEEPALIVE
79 };
80 
81 struct rs_svc_msg {
82 	uint32_t cmd;
83 	uint32_t status;
84 	struct rsocket *rs;
85 };
86 
87 struct rs_svc {
88 	pthread_t id;
89 	int sock[2];
90 	int cnt;
91 	int size;
92 	int context_size;
93 	void *(*run)(void *svc);
94 	struct rsocket **rss;
95 	void *contexts;
96 };
97 
98 static struct pollfd *udp_svc_fds;
99 static void *udp_svc_run(void *arg);
100 static struct rs_svc udp_svc = {
101 	.context_size = sizeof(*udp_svc_fds),
102 	.run = udp_svc_run
103 };
104 static uint32_t *tcp_svc_timeouts;
105 static void *tcp_svc_run(void *arg);
106 static struct rs_svc tcp_svc = {
107 	.context_size = sizeof(*tcp_svc_timeouts),
108 	.run = tcp_svc_run
109 };
110 
111 static uint16_t def_iomap_size = 0;
112 static uint16_t def_inline = 64;
113 static uint16_t def_sqsize = 384;
114 static uint16_t def_rqsize = 384;
115 static uint32_t def_mem = (1 << 17);
116 static uint32_t def_wmem = (1 << 17);
117 static uint32_t polling_time = 10;
118 
119 /*
120  * Immediate data format is determined by the upper bits
121  * bit 31: message type, 0 - data, 1 - control
122  * bit 30: buffers updated, 0 - target, 1 - direct-receive
123  * bit 29: more data, 0 - end of transfer, 1 - more data available
124  *
125  * for data transfers:
126  * bits [28:0]: bytes transferred
127  * for control messages:
128  * SGL, CTRL
129  * bits [28-0]: receive credits granted
130  * IOMAP_SGL
131  * bits [28-16]: reserved, bits [15-0]: index
132  */
133 
134 enum {
135 	RS_OP_DATA,
136 	RS_OP_RSVD_DATA_MORE,
137 	RS_OP_WRITE, /* opcode is not transmitted over the network */
138 	RS_OP_RSVD_DRA_MORE,
139 	RS_OP_SGL,
140 	RS_OP_RSVD,
141 	RS_OP_IOMAP_SGL,
142 	RS_OP_CTRL
143 };
144 #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
145 #define rs_msg_op(imm_data)   (imm_data >> 29)
146 #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
147 #define RS_MSG_SIZE	      sizeof(uint32_t)
148 
149 #define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63)
150 #define RS_WR_ID_FLAG_MSG_SEND (((uint64_t) 1) << 62) /* See RS_OPT_MSG_SEND */
151 #define rs_send_wr_id(data) ((uint64_t) data)
152 #define rs_recv_wr_id(data) (RS_WR_ID_FLAG_RECV | (uint64_t) data)
153 #define rs_wr_is_recv(wr_id) (wr_id & RS_WR_ID_FLAG_RECV)
154 #define rs_wr_is_msg_send(wr_id) (wr_id & RS_WR_ID_FLAG_MSG_SEND)
155 #define rs_wr_data(wr_id) ((uint32_t) wr_id)
156 
157 enum {
158 	RS_CTRL_DISCONNECT,
159 	RS_CTRL_KEEPALIVE,
160 	RS_CTRL_SHUTDOWN
161 };
162 
163 struct rs_msg {
164 	uint32_t op;
165 	uint32_t data;
166 };
167 
168 struct ds_qp;
169 
170 struct ds_rmsg {
171 	struct ds_qp	*qp;
172 	uint32_t	offset;
173 	uint32_t	length;
174 };
175 
176 struct ds_smsg {
177 	struct ds_smsg	*next;
178 };
179 
180 struct rs_sge {
181 	uint64_t addr;
182 	uint32_t key;
183 	uint32_t length;
184 };
185 
186 struct rs_iomap {
187 	uint64_t offset;
188 	struct rs_sge sge;
189 };
190 
191 struct rs_iomap_mr {
192 	uint64_t offset;
193 	struct ibv_mr *mr;
194 	dlist_entry entry;
195 	_Atomic(int) refcnt;
196 	int index;	/* -1 if mapping is local and not in iomap_list */
197 };
198 
199 #define RS_MAX_CTRL_MSG    (sizeof(struct rs_sge))
200 #define rs_host_is_net()   (__BYTE_ORDER == __BIG_ENDIAN)
201 #define RS_CONN_FLAG_NET   (1 << 0)
202 #define RS_CONN_FLAG_IOMAP (1 << 1)
203 
204 struct rs_conn_data {
205 	uint8_t		  version;
206 	uint8_t		  flags;
207 	__be16		  credits;
208 	uint8_t		  reserved[3];
209 	uint8_t		  target_iomap_size;
210 	struct rs_sge	  target_sgl;
211 	struct rs_sge	  data_buf;
212 };
213 
214 struct rs_conn_private_data {
215 	union {
216 		struct rs_conn_data		conn_data;
217 		struct {
218 			struct ib_connect_hdr	ib_hdr;
219 			struct rs_conn_data	conn_data;
220 		} af_ib;
221 	};
222 };
223 
224 /*
225  * rsocket states are ordered as passive, connecting, connected, disconnected.
226  */
227 enum rs_state {
228 	rs_init,
229 	rs_bound	   =		    0x0001,
230 	rs_listening	   =		    0x0002,
231 	rs_opening	   =		    0x0004,
232 	rs_resolving_addr  = rs_opening |   0x0010,
233 	rs_resolving_route = rs_opening |   0x0020,
234 	rs_connecting      = rs_opening |   0x0040,
235 	rs_accepting       = rs_opening |   0x0080,
236 	rs_connected	   =		    0x0100,
237 	rs_writable 	   =		    0x0200,
238 	rs_readable	   =		    0x0400,
239 	rs_connect_rdwr    = rs_connected | rs_readable | rs_writable,
240 	rs_connect_error   =		    0x0800,
241 	rs_disconnected	   =		    0x1000,
242 	rs_error	   =		    0x2000,
243 };
244 
245 #define RS_OPT_SWAP_SGL   (1 << 0)
246 /*
247  * iWarp does not support RDMA write with immediate data.  For iWarp, we
248  * transfer rsocket messages as inline sends.
249  */
250 #define RS_OPT_MSG_SEND   (1 << 1)
251 #define RS_OPT_SVC_ACTIVE (1 << 2)
252 
253 union socket_addr {
254 	struct sockaddr		sa;
255 	struct sockaddr_in	sin;
256 	struct sockaddr_in6	sin6;
257 };
258 
259 struct ds_header {
260 	uint8_t		  version;
261 	uint8_t		  length;
262 	__be16		  port;
263 	union {
264 		__be32  ipv4;
265 		struct {
266 			__be32 flowinfo;
267 			uint8_t  addr[16];
268 		} ipv6;
269 	} addr;
270 };
271 
272 #define DS_IPV4_HDR_LEN  8
273 #define DS_IPV6_HDR_LEN 24
274 
275 struct ds_dest {
276 	union socket_addr addr;	/* must be first */
277 	struct ds_qp	  *qp;
278 	struct ibv_ah	  *ah;
279 	uint32_t	   qpn;
280 };
281 
282 struct ds_qp {
283 	dlist_entry	  list;
284 	struct rsocket	  *rs;
285 	struct rdma_cm_id *cm_id;
286 	struct ds_header  hdr;
287 	struct ds_dest	  dest;
288 
289 	struct ibv_mr	  *smr;
290 	struct ibv_mr	  *rmr;
291 	uint8_t		  *rbuf;
292 
293 	int		  cq_armed;
294 };
295 
296 struct rsocket {
297 	int		  type;
298 	int		  index;
299 	fastlock_t	  slock;
300 	fastlock_t	  rlock;
301 	fastlock_t	  cq_lock;
302 	fastlock_t	  cq_wait_lock;
303 	fastlock_t	  map_lock; /* acquire slock first if needed */
304 
305 	union {
306 		/* data stream */
307 		struct {
308 			struct rdma_cm_id *cm_id;
309 			uint64_t	  tcp_opts;
310 			unsigned int	  keepalive_time;
311 
312 			unsigned int	  ctrl_seqno;
313 			unsigned int	  ctrl_max_seqno;
314 			uint16_t	  sseq_no;
315 			uint16_t	  sseq_comp;
316 			uint16_t	  rseq_no;
317 			uint16_t	  rseq_comp;
318 
319 			int		  remote_sge;
320 			struct rs_sge	  remote_sgl;
321 			struct rs_sge	  remote_iomap;
322 
323 			struct ibv_mr	  *target_mr;
324 			int		  target_sge;
325 			int		  target_iomap_size;
326 			void		  *target_buffer_list;
327 			volatile struct rs_sge	  *target_sgl;
328 			struct rs_iomap   *target_iomap;
329 
330 			int		  rbuf_msg_index;
331 			int		  rbuf_bytes_avail;
332 			int		  rbuf_free_offset;
333 			int		  rbuf_offset;
334 			struct ibv_mr	  *rmr;
335 			uint8_t		  *rbuf;
336 
337 			int		  sbuf_bytes_avail;
338 			struct ibv_mr	  *smr;
339 			struct ibv_sge	  ssgl[2];
340 		};
341 		/* datagram */
342 		struct {
343 			struct ds_qp	  *qp_list;
344 			void		  *dest_map;
345 			struct ds_dest    *conn_dest;
346 
347 			int		  udp_sock;
348 			int		  epfd;
349 			int		  rqe_avail;
350 			struct ds_smsg	  *smsg_free;
351 		};
352 	};
353 
354 	int		  opts;
355 	int		  fd_flags;
356 	uint64_t	  so_opts;
357 	uint64_t	  ipv6_opts;
358 	void		  *optval;
359 	size_t		  optlen;
360 	int		  state;
361 	int		  cq_armed;
362 	int		  retries;
363 	int		  err;
364 
365 	int		  sqe_avail;
366 	uint32_t	  sbuf_size;
367 	uint16_t	  sq_size;
368 	uint16_t	  sq_inline;
369 
370 	uint32_t	  rbuf_size;
371 	uint16_t	  rq_size;
372 	int		  rmsg_head;
373 	int		  rmsg_tail;
374 	union {
375 		struct rs_msg	  *rmsg;
376 		struct ds_rmsg	  *dmsg;
377 	};
378 
379 	uint8_t		  *sbuf;
380 	struct rs_iomap_mr *remote_iomappings;
381 	dlist_entry	  iomap_list;
382 	dlist_entry	  iomap_queue;
383 	int		  iomap_pending;
384 	int		  unack_cqe;
385 };
386 
387 #define DS_UDP_TAG 0x55555555
388 
389 struct ds_udp_header {
390 	__be32		  tag;
391 	uint8_t		  version;
392 	uint8_t		  op;
393 	uint8_t		  length;
394 	uint8_t		  reserved;
395 	__be32		  qpn;  /* lower 8-bits reserved */
396 	union {
397 		__be32	 ipv4;
398 		uint8_t  ipv6[16];
399 	} addr;
400 };
401 
402 #define DS_UDP_IPV4_HDR_LEN 16
403 #define DS_UDP_IPV6_HDR_LEN 28
404 
405 #define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
406 
407 static void write_all(int fd, const void *msg, size_t len)
408 {
409 	// FIXME: if fd is a socket this really needs to handle EINTR and other conditions.
410 	ssize_t rc = write(fd, msg, len);
411 	assert(rc == len);
412 }
413 
414 static void read_all(int fd, void *msg, size_t len)
415 {
416 	// FIXME: if fd is a socket this really needs to handle EINTR and other conditions.
417 	ssize_t rc = read(fd, msg, len);
418 	assert(rc == len);
419 }
420 
421 static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
422 {
423 	if (!rs->qp_list)
424 		dlist_init(&qp->list);
425 	else
426 		dlist_insert_head(&qp->list, &rs->qp_list->list);
427 	rs->qp_list = qp;
428 }
429 
430 static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
431 {
432 	if (qp->list.next != &qp->list) {
433 		rs->qp_list = ds_next_qp(qp);
434 		dlist_remove(&qp->list);
435 	} else {
436 		rs->qp_list = NULL;
437 	}
438 }
439 
440 static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd)
441 {
442 	struct rs_svc_msg msg;
443 	int ret;
444 
445 	pthread_mutex_lock(&mut);
446 	if (!svc->cnt) {
447 		ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc->sock);
448 		if (ret)
449 			goto unlock;
450 
451 		ret = pthread_create(&svc->id, NULL, svc->run, svc);
452 		if (ret) {
453 			ret = ERR(ret);
454 			goto closepair;
455 		}
456 	}
457 
458 	msg.cmd = cmd;
459 	msg.status = EINVAL;
460 	msg.rs = rs;
461 	write_all(svc->sock[0], &msg, sizeof msg);
462 	read_all(svc->sock[0], &msg, sizeof msg);
463 	ret = rdma_seterrno(msg.status);
464 	if (svc->cnt)
465 		goto unlock;
466 
467 	pthread_join(svc->id, NULL);
468 closepair:
469 	close(svc->sock[0]);
470 	close(svc->sock[1]);
471 unlock:
472 	pthread_mutex_unlock(&mut);
473 	return ret;
474 }
475 
476 static int ds_compare_addr(const void *dst1, const void *dst2)
477 {
478 	const struct sockaddr *sa1, *sa2;
479 	size_t len;
480 
481 	sa1 = (const struct sockaddr *) dst1;
482 	sa2 = (const struct sockaddr *) dst2;
483 
484 	len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
485 	      sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
486 	return memcmp(dst1, dst2, len);
487 }
488 
489 static int rs_value_to_scale(int value, int bits)
490 {
491 	return value <= (1 << (bits - 1)) ?
492 	       value : (1 << (bits - 1)) | (value >> bits);
493 }
494 
495 static int rs_scale_to_value(int value, int bits)
496 {
497 	return value <= (1 << (bits - 1)) ?
498 	       value : (value & ~(1 << (bits - 1))) << bits;
499 }
500 
501 /* gcc > ~5 will not allow (void)fscanf to suppress -Wunused-result, but this
502    will do it.  In this case ignoring the result is OK (but horribly
503    unfriendly to user) since the library has a sane default. */
504 #define failable_fscanf(f, fmt, ...)                                           \
505 	{                                                                      \
506 		int rc = fscanf(f, fmt, __VA_ARGS__);                          \
507 		(void) rc;                                                     \
508 	}
509 
510 static void rs_configure(void)
511 {
512 	FILE *f;
513 	static int init;
514 
515 	if (init)
516 		return;
517 
518 	pthread_mutex_lock(&mut);
519 	if (init)
520 		goto out;
521 
522 	if (ucma_init())
523 		goto out;
524 	ucma_ib_init();
525 
526 	if ((f = fopen(RS_CONF_DIR "/polling_time", "r"))) {
527 		failable_fscanf(f, "%u", &polling_time);
528 		fclose(f);
529 	}
530 
531 	if ((f = fopen(RS_CONF_DIR "/inline_default", "r"))) {
532 		failable_fscanf(f, "%hu", &def_inline);
533 		fclose(f);
534 	}
535 
536 	if ((f = fopen(RS_CONF_DIR "/sqsize_default", "r"))) {
537 		failable_fscanf(f, "%hu", &def_sqsize);
538 		fclose(f);
539 	}
540 
541 	if ((f = fopen(RS_CONF_DIR "/rqsize_default", "r"))) {
542 		failable_fscanf(f, "%hu", &def_rqsize);
543 		fclose(f);
544 	}
545 
546 	if ((f = fopen(RS_CONF_DIR "/mem_default", "r"))) {
547 		failable_fscanf(f, "%u", &def_mem);
548 		fclose(f);
549 
550 		if (def_mem < 1)
551 			def_mem = 1;
552 	}
553 
554 	if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
555 		failable_fscanf(f, "%u", &def_wmem);
556 		fclose(f);
557 		if (def_wmem < RS_SNDLOWAT)
558 			def_wmem = RS_SNDLOWAT << 1;
559 	}
560 
561 	if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) {
562 		failable_fscanf(f, "%hu", &def_iomap_size);
563 		fclose(f);
564 
565 		/* round to supported values */
566 		def_iomap_size = (uint8_t) rs_value_to_scale(
567 			(uint16_t) rs_scale_to_value(def_iomap_size, 8), 8);
568 	}
569 	init = 1;
570 out:
571 	pthread_mutex_unlock(&mut);
572 }
573 
574 static int rs_insert(struct rsocket *rs, int index)
575 {
576 	pthread_mutex_lock(&mut);
577 	rs->index = idm_set(&idm, index, rs);
578 	pthread_mutex_unlock(&mut);
579 	return rs->index;
580 }
581 
582 static void rs_remove(struct rsocket *rs)
583 {
584 	pthread_mutex_lock(&mut);
585 	idm_clear(&idm, rs->index);
586 	pthread_mutex_unlock(&mut);
587 }
588 
589 /* We only inherit from listening sockets */
590 static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
591 {
592 	struct rsocket *rs;
593 
594 	rs = calloc(1, sizeof(*rs));
595 	if (!rs)
596 		return NULL;
597 
598 	rs->type = type;
599 	rs->index = -1;
600 	if (type == SOCK_DGRAM) {
601 		rs->udp_sock = -1;
602 		rs->epfd = -1;
603 	}
604 
605 	if (inherited_rs) {
606 		rs->sbuf_size = inherited_rs->sbuf_size;
607 		rs->rbuf_size = inherited_rs->rbuf_size;
608 		rs->sq_inline = inherited_rs->sq_inline;
609 		rs->sq_size = inherited_rs->sq_size;
610 		rs->rq_size = inherited_rs->rq_size;
611 		if (type == SOCK_STREAM) {
612 			rs->ctrl_max_seqno = inherited_rs->ctrl_max_seqno;
613 			rs->target_iomap_size = inherited_rs->target_iomap_size;
614 		}
615 	} else {
616 		rs->sbuf_size = def_wmem;
617 		rs->rbuf_size = def_mem;
618 		rs->sq_inline = def_inline;
619 		rs->sq_size = def_sqsize;
620 		rs->rq_size = def_rqsize;
621 		if (type == SOCK_STREAM) {
622 			rs->ctrl_max_seqno = RS_QP_CTRL_SIZE;
623 			rs->target_iomap_size = def_iomap_size;
624 		}
625 	}
626 	fastlock_init(&rs->slock);
627 	fastlock_init(&rs->rlock);
628 	fastlock_init(&rs->cq_lock);
629 	fastlock_init(&rs->cq_wait_lock);
630 	fastlock_init(&rs->map_lock);
631 	dlist_init(&rs->iomap_list);
632 	dlist_init(&rs->iomap_queue);
633 	return rs;
634 }
635 
636 static int rs_set_nonblocking(struct rsocket *rs, int arg)
637 {
638 	struct ds_qp *qp;
639 	int ret = 0;
640 
641 	if (rs->type == SOCK_STREAM) {
642 		if (rs->cm_id->recv_cq_channel)
643 			ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
644 
645 		if (!ret && rs->state < rs_connected)
646 			ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
647 	} else {
648 		ret = fcntl(rs->epfd, F_SETFL, arg);
649 		if (!ret && rs->qp_list) {
650 			qp = rs->qp_list;
651 			do {
652 				ret = fcntl(qp->cm_id->recv_cq_channel->fd,
653 					    F_SETFL, arg);
654 				qp = ds_next_qp(qp);
655 			} while (qp != rs->qp_list && !ret);
656 		}
657 	}
658 
659 	return ret;
660 }
661 
662 static void rs_set_qp_size(struct rsocket *rs)
663 {
664 	uint16_t max_size;
665 
666 	max_size = min(ucma_max_qpsize(rs->cm_id), RS_QP_MAX_SIZE);
667 
668 	if (rs->sq_size > max_size)
669 		rs->sq_size = max_size;
670 	else if (rs->sq_size < RS_QP_MIN_SIZE)
671 		rs->sq_size = RS_QP_MIN_SIZE;
672 
673 	if (rs->rq_size > max_size)
674 		rs->rq_size = max_size;
675 	else if (rs->rq_size < RS_QP_MIN_SIZE)
676 		rs->rq_size = RS_QP_MIN_SIZE;
677 }
678 
679 static void ds_set_qp_size(struct rsocket *rs)
680 {
681 	uint16_t max_size;
682 
683 	max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
684 
685 	if (rs->sq_size > max_size)
686 		rs->sq_size = max_size;
687 	if (rs->rq_size > max_size)
688 		rs->rq_size = max_size;
689 
690 	if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
691 		rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
692 	else
693 		rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
694 
695 	if (rs->sq_size > (rs->sbuf_size / RS_SNDLOWAT))
696 		rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
697 	else
698 		rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
699 }
700 
701 static int rs_init_bufs(struct rsocket *rs)
702 {
703 	uint32_t total_rbuf_size, total_sbuf_size;
704 	size_t len;
705 
706 	rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
707 	if (!rs->rmsg)
708 		return ERR(ENOMEM);
709 
710 	total_sbuf_size = rs->sbuf_size;
711 	if (rs->sq_inline < RS_MAX_CTRL_MSG)
712 		total_sbuf_size += RS_MAX_CTRL_MSG * RS_QP_CTRL_SIZE;
713 	rs->sbuf = calloc(total_sbuf_size, 1);
714 	if (!rs->sbuf)
715 		return ERR(ENOMEM);
716 
717 	rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, total_sbuf_size);
718 	if (!rs->smr)
719 		return -1;
720 
721 	len = sizeof(*rs->target_sgl) * RS_SGL_SIZE +
722 	      sizeof(*rs->target_iomap) * rs->target_iomap_size;
723 	rs->target_buffer_list = malloc(len);
724 	if (!rs->target_buffer_list)
725 		return ERR(ENOMEM);
726 
727 	rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
728 	if (!rs->target_mr)
729 		return -1;
730 
731 	memset(rs->target_buffer_list, 0, len);
732 	rs->target_sgl = rs->target_buffer_list;
733 	if (rs->target_iomap_size)
734 		rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
735 
736 	total_rbuf_size = rs->rbuf_size;
737 	if (rs->opts & RS_OPT_MSG_SEND)
738 		total_rbuf_size += rs->rq_size * RS_MSG_SIZE;
739 	rs->rbuf = calloc(total_rbuf_size, 1);
740 	if (!rs->rbuf)
741 		return ERR(ENOMEM);
742 
743 	rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, total_rbuf_size);
744 	if (!rs->rmr)
745 		return -1;
746 
747 	rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
748 	rs->sbuf_bytes_avail = rs->sbuf_size;
749 	rs->ssgl[0].lkey = rs->ssgl[1].lkey = rs->smr->lkey;
750 
751 	rs->rbuf_free_offset = rs->rbuf_size >> 1;
752 	rs->rbuf_bytes_avail = rs->rbuf_size >> 1;
753 	rs->sqe_avail = rs->sq_size - rs->ctrl_max_seqno;
754 	rs->rseq_comp = rs->rq_size >> 1;
755 	return 0;
756 }
757 
758 static int ds_init_bufs(struct ds_qp *qp)
759 {
760 	qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), 1);
761 	if (!qp->rbuf)
762 		return ERR(ENOMEM);
763 
764 	qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size);
765 	if (!qp->smr)
766 		return -1;
767 
768 	qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size +
769 						     sizeof(struct ibv_grh));
770 	if (!qp->rmr)
771 		return -1;
772 
773 	return 0;
774 }
775 
776 /*
777  * If a user is waiting on a datagram rsocket through poll or select, then
778  * we need the first completion to generate an event on the related epoll fd
779  * in order to signal the user.  We arm the CQ on creation for this purpose
780  */
781 static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
782 {
783 	cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
784 	if (!cm_id->recv_cq_channel)
785 		return -1;
786 
787 	cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
788 				       cm_id, cm_id->recv_cq_channel, 0);
789 	if (!cm_id->recv_cq)
790 		goto err1;
791 
792 	if (rs->fd_flags & O_NONBLOCK) {
793 		if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK))
794 			goto err2;
795 	}
796 
797 	ibv_req_notify_cq(cm_id->recv_cq, 0);
798 	cm_id->send_cq_channel = cm_id->recv_cq_channel;
799 	cm_id->send_cq = cm_id->recv_cq;
800 	return 0;
801 
802 err2:
803 	ibv_destroy_cq(cm_id->recv_cq);
804 	cm_id->recv_cq = NULL;
805 err1:
806 	ibv_destroy_comp_channel(cm_id->recv_cq_channel);
807 	cm_id->recv_cq_channel = NULL;
808 	return -1;
809 }
810 
811 static inline int rs_post_recv(struct rsocket *rs)
812 {
813 	struct ibv_recv_wr wr, *bad;
814 	struct ibv_sge sge;
815 
816 	wr.next = NULL;
817 	if (!(rs->opts & RS_OPT_MSG_SEND)) {
818 		wr.wr_id = rs_recv_wr_id(0);
819 		wr.sg_list = NULL;
820 		wr.num_sge = 0;
821 	} else {
822 		wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index);
823 		sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size +
824 			   (rs->rbuf_msg_index * RS_MSG_SIZE);
825 		sge.length = RS_MSG_SIZE;
826 		sge.lkey = rs->rmr->lkey;
827 
828 		wr.sg_list = &sge;
829 		wr.num_sge = 1;
830 		if(++rs->rbuf_msg_index == rs->rq_size)
831 			rs->rbuf_msg_index = 0;
832 	}
833 
834 	return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
835 }
836 
837 static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, uint32_t offset)
838 {
839 	struct ibv_recv_wr wr, *bad;
840 	struct ibv_sge sge[2];
841 
842 	sge[0].addr = (uintptr_t) qp->rbuf + rs->rbuf_size;
843 	sge[0].length = sizeof(struct ibv_grh);
844 	sge[0].lkey = qp->rmr->lkey;
845 	sge[1].addr = (uintptr_t) qp->rbuf + offset;
846 	sge[1].length = RS_SNDLOWAT;
847 	sge[1].lkey = qp->rmr->lkey;
848 
849 	wr.wr_id = rs_recv_wr_id(offset);
850 	wr.next = NULL;
851 	wr.sg_list = sge;
852 	wr.num_sge = 2;
853 
854 	return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad));
855 }
856 
857 static int rs_create_ep(struct rsocket *rs)
858 {
859 	struct ibv_qp_init_attr qp_attr;
860 	int i, ret;
861 
862 	rs_set_qp_size(rs);
863 	if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP)
864 		rs->opts |= RS_OPT_MSG_SEND;
865 	ret = rs_create_cq(rs, rs->cm_id);
866 	if (ret)
867 		return ret;
868 
869 	memset(&qp_attr, 0, sizeof qp_attr);
870 	qp_attr.qp_context = rs;
871 	qp_attr.send_cq = rs->cm_id->send_cq;
872 	qp_attr.recv_cq = rs->cm_id->recv_cq;
873 	qp_attr.qp_type = IBV_QPT_RC;
874 	qp_attr.sq_sig_all = 1;
875 	qp_attr.cap.max_send_wr = rs->sq_size;
876 	qp_attr.cap.max_recv_wr = rs->rq_size;
877 	qp_attr.cap.max_send_sge = 2;
878 	qp_attr.cap.max_recv_sge = 1;
879 	qp_attr.cap.max_inline_data = rs->sq_inline;
880 
881 	ret = rdma_create_qp(rs->cm_id, NULL, &qp_attr);
882 	if (ret)
883 		return ret;
884 
885 	rs->sq_inline = qp_attr.cap.max_inline_data;
886 	if ((rs->opts & RS_OPT_MSG_SEND) && (rs->sq_inline < RS_MSG_SIZE))
887 		return ERR(ENOTSUP);
888 
889 	ret = rs_init_bufs(rs);
890 	if (ret)
891 		return ret;
892 
893 	for (i = 0; i < rs->rq_size; i++) {
894 		ret = rs_post_recv(rs);
895 		if (ret)
896 			return ret;
897 	}
898 	return 0;
899 }
900 
901 static void rs_release_iomap_mr(struct rs_iomap_mr *iomr)
902 {
903 	if (atomic_fetch_sub(&iomr->refcnt, 1) != 1)
904 		return;
905 
906 	dlist_remove(&iomr->entry);
907 	ibv_dereg_mr(iomr->mr);
908 	if (iomr->index >= 0)
909 		iomr->mr = NULL;
910 	else
911 		free(iomr);
912 }
913 
914 static void rs_free_iomappings(struct rsocket *rs)
915 {
916 	struct rs_iomap_mr *iomr;
917 
918 	while (!dlist_empty(&rs->iomap_list)) {
919 		iomr = container_of(rs->iomap_list.next,
920 				    struct rs_iomap_mr, entry);
921 		riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
922 	}
923 	while (!dlist_empty(&rs->iomap_queue)) {
924 		iomr = container_of(rs->iomap_queue.next,
925 				    struct rs_iomap_mr, entry);
926 		riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
927 	}
928 }
929 
930 static void ds_free_qp(struct ds_qp *qp)
931 {
932 	if (qp->smr)
933 		rdma_dereg_mr(qp->smr);
934 
935 	if (qp->rbuf) {
936 		if (qp->rmr)
937 			rdma_dereg_mr(qp->rmr);
938 		free(qp->rbuf);
939 	}
940 
941 	if (qp->cm_id) {
942 		if (qp->cm_id->qp) {
943 			tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
944 			epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
945 				  qp->cm_id->recv_cq_channel->fd, NULL);
946 			rdma_destroy_qp(qp->cm_id);
947 		}
948 		rdma_destroy_id(qp->cm_id);
949 	}
950 
951 	free(qp);
952 }
953 
954 static void ds_free(struct rsocket *rs)
955 {
956 	struct ds_qp *qp;
957 
958 	if (rs->udp_sock >= 0)
959 		close(rs->udp_sock);
960 
961 	if (rs->index >= 0)
962 		rs_remove(rs);
963 
964 	if (rs->dmsg)
965 		free(rs->dmsg);
966 
967 	while ((qp = rs->qp_list)) {
968 		ds_remove_qp(rs, qp);
969 		ds_free_qp(qp);
970 	}
971 
972 	if (rs->epfd >= 0)
973 		close(rs->epfd);
974 
975 	if (rs->sbuf)
976 		free(rs->sbuf);
977 
978 	tdestroy(rs->dest_map, free);
979 	fastlock_destroy(&rs->map_lock);
980 	fastlock_destroy(&rs->cq_wait_lock);
981 	fastlock_destroy(&rs->cq_lock);
982 	fastlock_destroy(&rs->rlock);
983 	fastlock_destroy(&rs->slock);
984 	free(rs);
985 }
986 
987 static void rs_free(struct rsocket *rs)
988 {
989 	if (rs->type == SOCK_DGRAM) {
990 		ds_free(rs);
991 		return;
992 	}
993 
994 	if (rs->rmsg)
995 		free(rs->rmsg);
996 
997 	if (rs->sbuf) {
998 		if (rs->smr)
999 			rdma_dereg_mr(rs->smr);
1000 		free(rs->sbuf);
1001 	}
1002 
1003 	if (rs->rbuf) {
1004 		if (rs->rmr)
1005 			rdma_dereg_mr(rs->rmr);
1006 		free(rs->rbuf);
1007 	}
1008 
1009 	if (rs->target_buffer_list) {
1010 		if (rs->target_mr)
1011 			rdma_dereg_mr(rs->target_mr);
1012 		free(rs->target_buffer_list);
1013 	}
1014 
1015 	if (rs->cm_id) {
1016 		rs_free_iomappings(rs);
1017 		if (rs->cm_id->qp) {
1018 			ibv_ack_cq_events(rs->cm_id->recv_cq, rs->unack_cqe);
1019 			rdma_destroy_qp(rs->cm_id);
1020 		}
1021 		rdma_destroy_id(rs->cm_id);
1022 	}
1023 
1024 	if (rs->index >= 0)
1025 		rs_remove(rs);
1026 
1027 	fastlock_destroy(&rs->map_lock);
1028 	fastlock_destroy(&rs->cq_wait_lock);
1029 	fastlock_destroy(&rs->cq_lock);
1030 	fastlock_destroy(&rs->rlock);
1031 	fastlock_destroy(&rs->slock);
1032 	free(rs);
1033 }
1034 
1035 static size_t rs_conn_data_offset(struct rsocket *rs)
1036 {
1037 	return (rs->cm_id->route.addr.src_addr.sa_family == AF_IB) ?
1038 		sizeof(struct ib_connect_hdr) : 0;
1039 }
1040 
1041 static void rs_format_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
1042 {
1043 	conn->version = 1;
1044 	conn->flags = RS_CONN_FLAG_IOMAP |
1045 		      (rs_host_is_net() ? RS_CONN_FLAG_NET : 0);
1046 	conn->credits = htobe16(rs->rq_size);
1047 	memset(conn->reserved, 0, sizeof conn->reserved);
1048 	conn->target_iomap_size = (uint8_t) rs_value_to_scale(rs->target_iomap_size, 8);
1049 
1050 	conn->target_sgl.addr = (__force uint64_t)htobe64((uintptr_t) rs->target_sgl);
1051 	conn->target_sgl.length = (__force uint32_t)htobe32(RS_SGL_SIZE);
1052 	conn->target_sgl.key = (__force uint32_t)htobe32(rs->target_mr->rkey);
1053 
1054 	conn->data_buf.addr = (__force uint64_t)htobe64((uintptr_t) rs->rbuf);
1055 	conn->data_buf.length = (__force uint32_t)htobe32(rs->rbuf_size >> 1);
1056 	conn->data_buf.key = (__force uint32_t)htobe32(rs->rmr->rkey);
1057 }
1058 
1059 static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
1060 {
1061 	rs->remote_sgl.addr = be64toh((__force __be64)conn->target_sgl.addr);
1062 	rs->remote_sgl.length = be32toh((__force __be32)conn->target_sgl.length);
1063 	rs->remote_sgl.key = be32toh((__force __be32)conn->target_sgl.key);
1064 	rs->remote_sge = 1;
1065 	if ((rs_host_is_net() && !(conn->flags & RS_CONN_FLAG_NET)) ||
1066 	    (!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET)))
1067 		rs->opts = RS_OPT_SWAP_SGL;
1068 
1069 	if (conn->flags & RS_CONN_FLAG_IOMAP) {
1070 		rs->remote_iomap.addr = rs->remote_sgl.addr +
1071 					sizeof(rs->remote_sgl) * rs->remote_sgl.length;
1072 		rs->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8);
1073 		rs->remote_iomap.key = rs->remote_sgl.key;
1074 	}
1075 
1076 	rs->target_sgl[0].addr = be64toh((__force __be64)conn->data_buf.addr);
1077 	rs->target_sgl[0].length = be32toh((__force __be32)conn->data_buf.length);
1078 	rs->target_sgl[0].key = be32toh((__force __be32)conn->data_buf.key);
1079 
1080 	rs->sseq_comp = be16toh(conn->credits);
1081 }
1082 
1083 static int ds_init(struct rsocket *rs, int domain)
1084 {
1085 	rs->udp_sock = socket(domain, SOCK_DGRAM, 0);
1086 	if (rs->udp_sock < 0)
1087 		return rs->udp_sock;
1088 
1089 	rs->epfd = epoll_create(2);
1090 	if (rs->epfd < 0)
1091 		return rs->epfd;
1092 
1093 	return 0;
1094 }
1095 
1096 static int ds_init_ep(struct rsocket *rs)
1097 {
1098 	struct ds_smsg *msg;
1099 	int i, ret;
1100 
1101 	ds_set_qp_size(rs);
1102 
1103 	rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
1104 	if (!rs->sbuf)
1105 		return ERR(ENOMEM);
1106 
1107 	rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
1108 	if (!rs->dmsg)
1109 		return ERR(ENOMEM);
1110 
1111 	rs->sqe_avail = rs->sq_size;
1112 	rs->rqe_avail = rs->rq_size;
1113 
1114 	rs->smsg_free = (struct ds_smsg *) rs->sbuf;
1115 	msg = rs->smsg_free;
1116 	for (i = 0; i < rs->sq_size - 1; i++) {
1117 		msg->next = (void *) msg + RS_SNDLOWAT;
1118 		msg = msg->next;
1119 	}
1120 	msg->next = NULL;
1121 
1122 	ret = rs_notify_svc(&udp_svc, rs, RS_SVC_ADD_DGRAM);
1123 	if (ret)
1124 		return ret;
1125 
1126 	rs->state = rs_readable | rs_writable;
1127 	return 0;
1128 }
1129 
1130 int rsocket(int domain, int type, int protocol)
1131 {
1132 	struct rsocket *rs;
1133 	int index, ret;
1134 
1135 	if ((domain != AF_INET && domain != AF_INET6 && domain != AF_IB) ||
1136 	    ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) ||
1137 	    (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) ||
1138 	    (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP))
1139 		return ERR(ENOTSUP);
1140 
1141 	rs_configure();
1142 	rs = rs_alloc(NULL, type);
1143 	if (!rs)
1144 		return ERR(ENOMEM);
1145 
1146 	if (type == SOCK_STREAM) {
1147 		ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
1148 		if (ret)
1149 			goto err;
1150 
1151 		rs->cm_id->route.addr.src_addr.sa_family = domain;
1152 		index = rs->cm_id->channel->fd;
1153 	} else {
1154 		ret = ds_init(rs, domain);
1155 		if (ret)
1156 			goto err;
1157 
1158 		index = rs->udp_sock;
1159 	}
1160 
1161 	ret = rs_insert(rs, index);
1162 	if (ret < 0)
1163 		goto err;
1164 
1165 	return rs->index;
1166 
1167 err:
1168 	rs_free(rs);
1169 	return ret;
1170 }
1171 
1172 int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
1173 {
1174 	struct rsocket *rs;
1175 	int ret;
1176 
1177 	rs = idm_lookup(&idm, socket);
1178 	if (!rs)
1179 		return ERR(EBADF);
1180 	if (rs->type == SOCK_STREAM) {
1181 		ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
1182 		if (!ret)
1183 			rs->state = rs_bound;
1184 	} else {
1185 		if (rs->state == rs_init) {
1186 			ret = ds_init_ep(rs);
1187 			if (ret)
1188 				return ret;
1189 		}
1190 		ret = bind(rs->udp_sock, addr, addrlen);
1191 	}
1192 	return ret;
1193 }
1194 
1195 int rlisten(int socket, int backlog)
1196 {
1197 	struct rsocket *rs;
1198 	int ret;
1199 
1200 	rs = idm_lookup(&idm, socket);
1201 	if (!rs)
1202 		return ERR(EBADF);
1203 
1204 	if (rs->state != rs_listening) {
1205 		ret = rdma_listen(rs->cm_id, backlog);
1206 		if (!ret)
1207 			rs->state = rs_listening;
1208 	} else {
1209 		ret = 0;
1210 	}
1211 	return ret;
1212 }
1213 
1214 /*
1215  * Nonblocking is usually not inherited between sockets, but we need to
1216  * inherit it here to establish the connection only.  This is needed to
1217  * prevent rdma_accept from blocking until the remote side finishes
1218  * establishing the connection.  If we were to allow rdma_accept to block,
1219  * then a single thread cannot establish a connection with itself, or
1220  * two threads which try to connect to each other can deadlock trying to
1221  * form a connection.
1222  *
1223  * Data transfers on the new socket remain blocking unless the user
1224  * specifies otherwise through rfcntl.
1225  */
1226 int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
1227 {
1228 	struct rsocket *rs, *new_rs;
1229 	struct rdma_conn_param param;
1230 	struct rs_conn_data *creq, cresp;
1231 	int ret;
1232 
1233 	rs = idm_lookup(&idm, socket);
1234 	if (!rs)
1235 		return ERR(EBADF);
1236 	new_rs = rs_alloc(rs, rs->type);
1237 	if (!new_rs)
1238 		return ERR(ENOMEM);
1239 
1240 	ret = rdma_get_request(rs->cm_id, &new_rs->cm_id);
1241 	if (ret)
1242 		goto err;
1243 
1244 	ret = rs_insert(new_rs, new_rs->cm_id->channel->fd);
1245 	if (ret < 0)
1246 		goto err;
1247 
1248 	creq = (struct rs_conn_data *)
1249 	       (new_rs->cm_id->event->param.conn.private_data + rs_conn_data_offset(rs));
1250 	if (creq->version != 1) {
1251 		ret = ERR(ENOTSUP);
1252 		goto err;
1253 	}
1254 
1255 	if (rs->fd_flags & O_NONBLOCK)
1256 		fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
1257 
1258 	ret = rs_create_ep(new_rs);
1259 	if (ret)
1260 		goto err;
1261 
1262 	rs_save_conn_data(new_rs, creq);
1263 	param = new_rs->cm_id->event->param.conn;
1264 	rs_format_conn_data(new_rs, &cresp);
1265 	param.private_data = &cresp;
1266 	param.private_data_len = sizeof cresp;
1267 	ret = rdma_accept(new_rs->cm_id, &param);
1268 	if (!ret)
1269 		new_rs->state = rs_connect_rdwr;
1270 	else if (errno == EAGAIN || errno == EWOULDBLOCK)
1271 		new_rs->state = rs_accepting;
1272 	else
1273 		goto err;
1274 
1275 	if (addr && addrlen)
1276 		rgetpeername(new_rs->index, addr, addrlen);
1277 	return new_rs->index;
1278 
1279 err:
1280 	rs_free(new_rs);
1281 	return ret;
1282 }
1283 
1284 static int rs_do_connect(struct rsocket *rs)
1285 {
1286 	struct rdma_conn_param param;
1287 	struct rs_conn_private_data cdata;
1288 	struct rs_conn_data *creq, *cresp;
1289 	int to, ret;
1290 
1291 	switch (rs->state) {
1292 	case rs_init:
1293 	case rs_bound:
1294 resolve_addr:
1295 		to = 1000 << rs->retries++;
1296 		ret = rdma_resolve_addr(rs->cm_id, NULL,
1297 					&rs->cm_id->route.addr.dst_addr, to);
1298 		if (!ret)
1299 			goto resolve_route;
1300 		if (errno == EAGAIN || errno == EWOULDBLOCK)
1301 			rs->state = rs_resolving_addr;
1302 		break;
1303 	case rs_resolving_addr:
1304 		ret = ucma_complete(rs->cm_id);
1305 		if (ret) {
1306 			if (errno == ETIMEDOUT && rs->retries <= RS_CONN_RETRIES)
1307 				goto resolve_addr;
1308 			break;
1309 		}
1310 
1311 		rs->retries = 0;
1312 resolve_route:
1313 		to = 1000 << rs->retries++;
1314 		if (rs->optval) {
1315 			ret = rdma_set_option(rs->cm_id,  RDMA_OPTION_IB,
1316 					      RDMA_OPTION_IB_PATH, rs->optval,
1317 					      rs->optlen);
1318 			free(rs->optval);
1319 			rs->optval = NULL;
1320 			if (!ret) {
1321 				rs->state = rs_resolving_route;
1322 				goto resolving_route;
1323 			}
1324 		} else {
1325 			ret = rdma_resolve_route(rs->cm_id, to);
1326 			if (!ret)
1327 				goto do_connect;
1328 		}
1329 		if (errno == EAGAIN || errno == EWOULDBLOCK)
1330 			rs->state = rs_resolving_route;
1331 		break;
1332 	case rs_resolving_route:
1333 resolving_route:
1334 		ret = ucma_complete(rs->cm_id);
1335 		if (ret) {
1336 			if (errno == ETIMEDOUT && rs->retries <= RS_CONN_RETRIES)
1337 				goto resolve_route;
1338 			break;
1339 		}
1340 do_connect:
1341 		ret = rs_create_ep(rs);
1342 		if (ret)
1343 			break;
1344 
1345 		memset(&param, 0, sizeof param);
1346 		creq = (void *) &cdata + rs_conn_data_offset(rs);
1347 		rs_format_conn_data(rs, creq);
1348 		param.private_data = (void *) creq - rs_conn_data_offset(rs);
1349 		param.private_data_len = sizeof(*creq) + rs_conn_data_offset(rs);
1350 		param.flow_control = 1;
1351 		param.retry_count = 7;
1352 		param.rnr_retry_count = 7;
1353 		/* work-around: iWarp issues RDMA read during connection */
1354 		if (rs->opts & RS_OPT_MSG_SEND)
1355 			param.initiator_depth = 1;
1356 		rs->retries = 0;
1357 
1358 		ret = rdma_connect(rs->cm_id, &param);
1359 		if (!ret)
1360 			goto connected;
1361 		if (errno == EAGAIN || errno == EWOULDBLOCK)
1362 			rs->state = rs_connecting;
1363 		break;
1364 	case rs_connecting:
1365 		ret = ucma_complete(rs->cm_id);
1366 		if (ret)
1367 			break;
1368 connected:
1369 		cresp = (struct rs_conn_data *) rs->cm_id->event->param.conn.private_data;
1370 		if (cresp->version != 1) {
1371 			ret = ERR(ENOTSUP);
1372 			break;
1373 		}
1374 
1375 		rs_save_conn_data(rs, cresp);
1376 		rs->state = rs_connect_rdwr;
1377 		break;
1378 	case rs_accepting:
1379 		if (!(rs->fd_flags & O_NONBLOCK))
1380 			fcntl(rs->cm_id->channel->fd, F_SETFL, 0);
1381 
1382 		ret = ucma_complete(rs->cm_id);
1383 		if (ret)
1384 			break;
1385 
1386 		rs->state = rs_connect_rdwr;
1387 		break;
1388 	default:
1389 		ret = ERR(EINVAL);
1390 		break;
1391 	}
1392 
1393 	if (ret) {
1394 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
1395 			errno = EINPROGRESS;
1396 		} else {
1397 			rs->state = rs_connect_error;
1398 			rs->err = errno;
1399 		}
1400 	}
1401 	return ret;
1402 }
1403 
1404 static int rs_any_addr(const union socket_addr *addr)
1405 {
1406 	if (addr->sa.sa_family == AF_INET) {
1407 		return (addr->sin.sin_addr.s_addr == htobe32(INADDR_ANY) ||
1408 			addr->sin.sin_addr.s_addr == htobe32(INADDR_LOOPBACK));
1409 	} else {
1410 		return (!memcmp(&addr->sin6.sin6_addr, &in6addr_any, 16) ||
1411 			!memcmp(&addr->sin6.sin6_addr, &in6addr_loopback, 16));
1412 	}
1413 }
1414 
1415 static int ds_get_src_addr(struct rsocket *rs,
1416 			   const struct sockaddr *dest_addr, socklen_t dest_len,
1417 			   union socket_addr *src_addr, socklen_t *src_len)
1418 {
1419 	int sock, ret;
1420 	__be16 port;
1421 
1422 	*src_len = sizeof(*src_addr);
1423 	ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
1424 	if (ret || !rs_any_addr(src_addr))
1425 		return ret;
1426 
1427 	port = src_addr->sin.sin_port;
1428 	sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0);
1429 	if (sock < 0)
1430 		return sock;
1431 
1432 	ret = connect(sock, dest_addr, dest_len);
1433 	if (ret)
1434 		goto out;
1435 
1436 	*src_len = sizeof(*src_addr);
1437 	ret = getsockname(sock, &src_addr->sa, src_len);
1438 	src_addr->sin.sin_port = port;
1439 out:
1440 	close(sock);
1441 	return ret;
1442 }
1443 
1444 static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
1445 {
1446 	if (addr->sa.sa_family == AF_INET) {
1447 		hdr->version = 4;
1448 		hdr->length = DS_IPV4_HDR_LEN;
1449 		hdr->port = addr->sin.sin_port;
1450 		hdr->addr.ipv4 = addr->sin.sin_addr.s_addr;
1451 	} else {
1452 		hdr->version = 6;
1453 		hdr->length = DS_IPV6_HDR_LEN;
1454 		hdr->port = addr->sin6.sin6_port;
1455 		hdr->addr.ipv6.flowinfo= addr->sin6.sin6_flowinfo;
1456 		memcpy(&hdr->addr.ipv6.addr, &addr->sin6.sin6_addr, 16);
1457 	}
1458 }
1459 
1460 static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
1461 			  socklen_t addrlen)
1462 {
1463 	struct ibv_port_attr port_attr;
1464 	struct ibv_ah_attr attr;
1465 	int ret;
1466 
1467 	memcpy(&qp->dest.addr, addr, addrlen);
1468 	qp->dest.qp = qp;
1469 	qp->dest.qpn = qp->cm_id->qp->qp_num;
1470 
1471 	ret = ibv_query_port(qp->cm_id->verbs, qp->cm_id->port_num, &port_attr);
1472 	if (ret)
1473 		return ret;
1474 
1475 	memset(&attr, 0, sizeof attr);
1476 	attr.dlid = port_attr.lid;
1477 	attr.port_num = qp->cm_id->port_num;
1478 	qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
1479 	if (!qp->dest.ah)
1480 		return ERR(ENOMEM);
1481 
1482 	tsearch(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
1483 	return 0;
1484 }
1485 
1486 static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
1487 			socklen_t addrlen, struct ds_qp **new_qp)
1488 {
1489 	struct ds_qp *qp;
1490 	struct ibv_qp_init_attr qp_attr;
1491 	struct epoll_event event;
1492 	int i, ret;
1493 
1494 	qp = calloc(1, sizeof(*qp));
1495 	if (!qp)
1496 		return ERR(ENOMEM);
1497 
1498 	qp->rs = rs;
1499 	ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP);
1500 	if (ret)
1501 		goto err;
1502 
1503 	ds_format_hdr(&qp->hdr, src_addr);
1504 	ret = rdma_bind_addr(qp->cm_id, &src_addr->sa);
1505 	if (ret)
1506 		goto err;
1507 
1508 	ret = ds_init_bufs(qp);
1509 	if (ret)
1510 		goto err;
1511 
1512 	ret = rs_create_cq(rs, qp->cm_id);
1513 	if (ret)
1514 		goto err;
1515 
1516 	memset(&qp_attr, 0, sizeof qp_attr);
1517 	qp_attr.qp_context = qp;
1518 	qp_attr.send_cq = qp->cm_id->send_cq;
1519 	qp_attr.recv_cq = qp->cm_id->recv_cq;
1520 	qp_attr.qp_type = IBV_QPT_UD;
1521 	qp_attr.sq_sig_all = 1;
1522 	qp_attr.cap.max_send_wr = rs->sq_size;
1523 	qp_attr.cap.max_recv_wr = rs->rq_size;
1524 	qp_attr.cap.max_send_sge = 1;
1525 	qp_attr.cap.max_recv_sge = 2;
1526 	qp_attr.cap.max_inline_data = rs->sq_inline;
1527 	ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr);
1528 	if (ret)
1529 		goto err;
1530 
1531 	rs->sq_inline = qp_attr.cap.max_inline_data;
1532 	ret = ds_add_qp_dest(qp, src_addr, addrlen);
1533 	if (ret)
1534 		goto err;
1535 
1536 	event.events = EPOLLIN;
1537 	event.data.ptr = qp;
1538 	ret = epoll_ctl(rs->epfd,  EPOLL_CTL_ADD,
1539 			qp->cm_id->recv_cq_channel->fd, &event);
1540 	if (ret)
1541 		goto err;
1542 
1543 	for (i = 0; i < rs->rq_size; i++) {
1544 		ret = ds_post_recv(rs, qp, i * RS_SNDLOWAT);
1545 		if (ret)
1546 			goto err;
1547 	}
1548 
1549 	ds_insert_qp(rs, qp);
1550 	*new_qp = qp;
1551 	return 0;
1552 err:
1553 	ds_free_qp(qp);
1554 	return ret;
1555 }
1556 
1557 static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
1558 		     socklen_t addrlen, struct ds_qp **qp)
1559 {
1560 	if (rs->qp_list) {
1561 		*qp = rs->qp_list;
1562 		do {
1563 			if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id),
1564 					     src_addr))
1565 				return 0;
1566 
1567 			*qp = ds_next_qp(*qp);
1568 		} while (*qp != rs->qp_list);
1569 	}
1570 
1571 	return ds_create_qp(rs, src_addr, addrlen, qp);
1572 }
1573 
1574 static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
1575 		       socklen_t addrlen, struct ds_dest **dest)
1576 {
1577 	union socket_addr src_addr;
1578 	socklen_t src_len;
1579 	struct ds_qp *qp;
1580 	struct ds_dest **tdest, *new_dest;
1581 	int ret = 0;
1582 
1583 	fastlock_acquire(&rs->map_lock);
1584 	tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
1585 	if (tdest)
1586 		goto found;
1587 
1588 	ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
1589 	if (ret)
1590 		goto out;
1591 
1592 	ret = ds_get_qp(rs, &src_addr, src_len, &qp);
1593 	if (ret)
1594 		goto out;
1595 
1596 	tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
1597 	if (!tdest) {
1598 		new_dest = calloc(1, sizeof(*new_dest));
1599 		if (!new_dest) {
1600 			ret = ERR(ENOMEM);
1601 			goto out;
1602 		}
1603 
1604 		memcpy(&new_dest->addr, addr, addrlen);
1605 		new_dest->qp = qp;
1606 		tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr);
1607 	}
1608 
1609 found:
1610 	*dest = *tdest;
1611 out:
1612 	fastlock_release(&rs->map_lock);
1613 	return ret;
1614 }
1615 
1616 int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
1617 {
1618 	struct rsocket *rs;
1619 	int ret;
1620 
1621 	rs = idm_lookup(&idm, socket);
1622 	if (!rs)
1623 		return ERR(EBADF);
1624 	if (rs->type == SOCK_STREAM) {
1625 		memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
1626 		ret = rs_do_connect(rs);
1627 	} else {
1628 		if (rs->state == rs_init) {
1629 			ret = ds_init_ep(rs);
1630 			if (ret)
1631 				return ret;
1632 		}
1633 
1634 		fastlock_acquire(&rs->slock);
1635 		ret = connect(rs->udp_sock, addr, addrlen);
1636 		if (!ret)
1637 			ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
1638 		fastlock_release(&rs->slock);
1639 	}
1640 	return ret;
1641 }
1642 
1643 static void *rs_get_ctrl_buf(struct rsocket *rs)
1644 {
1645 	return rs->sbuf + rs->sbuf_size +
1646 		RS_MAX_CTRL_MSG * (rs->ctrl_seqno & (RS_QP_CTRL_SIZE - 1));
1647 }
1648 
1649 static int rs_post_msg(struct rsocket *rs, uint32_t msg)
1650 {
1651 	struct ibv_send_wr wr, *bad;
1652 	struct ibv_sge sge;
1653 
1654 	wr.wr_id = rs_send_wr_id(msg);
1655 	wr.next = NULL;
1656 	if (!(rs->opts & RS_OPT_MSG_SEND)) {
1657 		wr.sg_list = NULL;
1658 		wr.num_sge = 0;
1659 		wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
1660 		wr.send_flags = 0;
1661 		wr.imm_data = htobe32(msg);
1662 	} else {
1663 		sge.addr = (uintptr_t) &msg;
1664 		sge.lkey = 0;
1665 		sge.length = sizeof msg;
1666 		wr.sg_list = &sge;
1667 		wr.num_sge = 1;
1668 		wr.opcode = IBV_WR_SEND;
1669 		wr.send_flags = IBV_SEND_INLINE;
1670 	}
1671 
1672 	return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
1673 }
1674 
1675 static int rs_post_write(struct rsocket *rs,
1676 			 struct ibv_sge *sgl, int nsge,
1677 			 uint32_t wr_data, int flags,
1678 			 uint64_t addr, uint32_t rkey)
1679 {
1680 	struct ibv_send_wr wr, *bad;
1681 
1682 	wr.wr_id = rs_send_wr_id(wr_data);
1683 	wr.next = NULL;
1684 	wr.sg_list = sgl;
1685 	wr.num_sge = nsge;
1686 	wr.opcode = IBV_WR_RDMA_WRITE;
1687 	wr.send_flags = flags;
1688 	wr.wr.rdma.remote_addr = addr;
1689 	wr.wr.rdma.rkey = rkey;
1690 
1691 	return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
1692 }
1693 
1694 static int rs_post_write_msg(struct rsocket *rs,
1695 			 struct ibv_sge *sgl, int nsge,
1696 			 uint32_t msg, int flags,
1697 			 uint64_t addr, uint32_t rkey)
1698 {
1699 	struct ibv_send_wr wr, *bad;
1700 	struct ibv_sge sge;
1701 	int ret;
1702 
1703 	wr.next = NULL;
1704 	if (!(rs->opts & RS_OPT_MSG_SEND)) {
1705 		wr.wr_id = rs_send_wr_id(msg);
1706 		wr.sg_list = sgl;
1707 		wr.num_sge = nsge;
1708 		wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
1709 		wr.send_flags = flags;
1710 		wr.imm_data = htobe32(msg);
1711 		wr.wr.rdma.remote_addr = addr;
1712 		wr.wr.rdma.rkey = rkey;
1713 
1714 		return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
1715 	} else {
1716 		ret = rs_post_write(rs, sgl, nsge, msg, flags, addr, rkey);
1717 		if (!ret) {
1718 			wr.wr_id = rs_send_wr_id(rs_msg_set(rs_msg_op(msg), 0)) |
1719 				   RS_WR_ID_FLAG_MSG_SEND;
1720 			sge.addr = (uintptr_t) &msg;
1721 			sge.lkey = 0;
1722 			sge.length = sizeof msg;
1723 			wr.sg_list = &sge;
1724 			wr.num_sge = 1;
1725 			wr.opcode = IBV_WR_SEND;
1726 			wr.send_flags = IBV_SEND_INLINE;
1727 
1728 			ret = rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
1729 		}
1730 		return ret;
1731 	}
1732 }
1733 
1734 static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
1735 			uint32_t wr_data)
1736 {
1737 	struct ibv_send_wr wr, *bad;
1738 
1739 	wr.wr_id = rs_send_wr_id(wr_data);
1740 	wr.next = NULL;
1741 	wr.sg_list = sge;
1742 	wr.num_sge = 1;
1743 	wr.opcode = IBV_WR_SEND;
1744 	wr.send_flags = (sge->length <= rs->sq_inline) ? IBV_SEND_INLINE : 0;
1745 	wr.wr.ud.ah = rs->conn_dest->ah;
1746 	wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
1747 	wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
1748 
1749 	return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
1750 }
1751 
1752 /*
1753  * Update target SGE before sending data.  Otherwise the remote side may
1754  * update the entry before we do.
1755  */
1756 static int rs_write_data(struct rsocket *rs,
1757 			 struct ibv_sge *sgl, int nsge,
1758 			 uint32_t length, int flags)
1759 {
1760 	uint64_t addr;
1761 	uint32_t rkey;
1762 
1763 	rs->sseq_no++;
1764 	rs->sqe_avail--;
1765 	if (rs->opts & RS_OPT_MSG_SEND)
1766 		rs->sqe_avail--;
1767 	rs->sbuf_bytes_avail -= length;
1768 
1769 	addr = rs->target_sgl[rs->target_sge].addr;
1770 	rkey = rs->target_sgl[rs->target_sge].key;
1771 
1772 	rs->target_sgl[rs->target_sge].addr += length;
1773 	rs->target_sgl[rs->target_sge].length -= length;
1774 
1775 	if (!rs->target_sgl[rs->target_sge].length) {
1776 		if (++rs->target_sge == RS_SGL_SIZE)
1777 			rs->target_sge = 0;
1778 	}
1779 
1780 	return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
1781 				 flags, addr, rkey);
1782 }
1783 
1784 static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
1785 			   struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
1786 {
1787 	uint64_t addr;
1788 
1789 	rs->sqe_avail--;
1790 	rs->sbuf_bytes_avail -= length;
1791 
1792 	addr = iom->sge.addr + offset - iom->offset;
1793 	return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
1794 			     flags, addr, iom->sge.key);
1795 }
1796 
1797 static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
1798 			  struct ibv_sge *sgl, int nsge, int flags)
1799 {
1800 	uint64_t addr;
1801 
1802 	rs->sseq_no++;
1803 	rs->sqe_avail--;
1804 	if (rs->opts & RS_OPT_MSG_SEND)
1805 		rs->sqe_avail--;
1806 	rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
1807 
1808 	addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
1809 	return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
1810 				 flags, addr, rs->remote_iomap.key);
1811 }
1812 
1813 static uint32_t rs_sbuf_left(struct rsocket *rs)
1814 {
1815 	return (uint32_t) (((uint64_t) (uintptr_t) &rs->sbuf[rs->sbuf_size]) -
1816 			   rs->ssgl[0].addr);
1817 }
1818 
1819 static void rs_send_credits(struct rsocket *rs)
1820 {
1821 	struct ibv_sge ibsge;
1822 	struct rs_sge sge, *sge_buf;
1823 	int flags;
1824 
1825 	rs->ctrl_seqno++;
1826 	rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1);
1827 	if (rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) {
1828 		if (rs->opts & RS_OPT_MSG_SEND)
1829 			rs->ctrl_seqno++;
1830 
1831 		if (!(rs->opts & RS_OPT_SWAP_SGL)) {
1832 			sge.addr = (uintptr_t) &rs->rbuf[rs->rbuf_free_offset];
1833 			sge.key = rs->rmr->rkey;
1834 			sge.length = rs->rbuf_size >> 1;
1835 		} else {
1836 			sge.addr = bswap_64((uintptr_t) &rs->rbuf[rs->rbuf_free_offset]);
1837 			sge.key = bswap_32(rs->rmr->rkey);
1838 			sge.length = bswap_32(rs->rbuf_size >> 1);
1839 		}
1840 
1841 		if (rs->sq_inline < sizeof sge) {
1842 			sge_buf = rs_get_ctrl_buf(rs);
1843 			memcpy(sge_buf, &sge, sizeof sge);
1844 			ibsge.addr = (uintptr_t) sge_buf;
1845 			ibsge.lkey = rs->smr->lkey;
1846 			flags = 0;
1847 		} else {
1848 			ibsge.addr = (uintptr_t) &sge;
1849 			ibsge.lkey = 0;
1850 			flags = IBV_SEND_INLINE;
1851 		}
1852 		ibsge.length = sizeof(sge);
1853 
1854 		rs_post_write_msg(rs, &ibsge, 1,
1855 			rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), flags,
1856 			rs->remote_sgl.addr + rs->remote_sge * sizeof(struct rs_sge),
1857 			rs->remote_sgl.key);
1858 
1859 		rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
1860 		rs->rbuf_free_offset += rs->rbuf_size >> 1;
1861 		if (rs->rbuf_free_offset >= rs->rbuf_size)
1862 			rs->rbuf_free_offset = 0;
1863 		if (++rs->remote_sge == rs->remote_sgl.length)
1864 			rs->remote_sge = 0;
1865 	} else {
1866 		rs_post_msg(rs, rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size));
1867 	}
1868 }
1869 
1870 static inline int rs_ctrl_avail(struct rsocket *rs)
1871 {
1872 	return rs->ctrl_seqno != rs->ctrl_max_seqno;
1873 }
1874 
1875 /* Protocols that do not support RDMA write with immediate may require 2 msgs */
1876 static inline int rs_2ctrl_avail(struct rsocket *rs)
1877 {
1878 	return (int)((rs->ctrl_seqno + 1) - rs->ctrl_max_seqno) < 0;
1879 }
1880 
1881 static int rs_give_credits(struct rsocket *rs)
1882 {
1883 	if (!(rs->opts & RS_OPT_MSG_SEND)) {
1884 		return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
1885 			((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
1886 		       rs_ctrl_avail(rs) && (rs->state & rs_connected);
1887 	} else {
1888 		return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
1889 			((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
1890 		       rs_2ctrl_avail(rs) && (rs->state & rs_connected);
1891 	}
1892 }
1893 
1894 static void rs_update_credits(struct rsocket *rs)
1895 {
1896 	if (rs_give_credits(rs))
1897 		rs_send_credits(rs);
1898 }
1899 
1900 static int rs_poll_cq(struct rsocket *rs)
1901 {
1902 	struct ibv_wc wc;
1903 	uint32_t msg;
1904 	int ret, rcnt = 0;
1905 
1906 	while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) {
1907 		if (rs_wr_is_recv(wc.wr_id)) {
1908 			if (wc.status != IBV_WC_SUCCESS)
1909 				continue;
1910 			rcnt++;
1911 
1912 			if (wc.wc_flags & IBV_WC_WITH_IMM) {
1913 				msg = be32toh(wc.imm_data);
1914 			} else {
1915 				msg = ((uint32_t *) (rs->rbuf + rs->rbuf_size))
1916 					[rs_wr_data(wc.wr_id)];
1917 
1918 			}
1919 			switch (rs_msg_op(msg)) {
1920 			case RS_OP_SGL:
1921 				rs->sseq_comp = (uint16_t) rs_msg_data(msg);
1922 				break;
1923 			case RS_OP_IOMAP_SGL:
1924 				/* The iomap was updated, that's nice to know. */
1925 				break;
1926 			case RS_OP_CTRL:
1927 				if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
1928 					rs->state = rs_disconnected;
1929 					return 0;
1930 				} else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
1931 					if (rs->state & rs_writable) {
1932 						rs->state &= ~rs_readable;
1933 					} else {
1934 						rs->state = rs_disconnected;
1935 						return 0;
1936 					}
1937 				}
1938 				break;
1939 			case RS_OP_WRITE:
1940 				/* We really shouldn't be here. */
1941 				break;
1942 			default:
1943 				rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg);
1944 				rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg);
1945 				if (++rs->rmsg_tail == rs->rq_size + 1)
1946 					rs->rmsg_tail = 0;
1947 				break;
1948 			}
1949 		} else {
1950 			switch  (rs_msg_op(rs_wr_data(wc.wr_id))) {
1951 			case RS_OP_SGL:
1952 				rs->ctrl_max_seqno++;
1953 				break;
1954 			case RS_OP_CTRL:
1955 				rs->ctrl_max_seqno++;
1956 				if (rs_msg_data(rs_wr_data(wc.wr_id)) == RS_CTRL_DISCONNECT)
1957 					rs->state = rs_disconnected;
1958 				break;
1959 			case RS_OP_IOMAP_SGL:
1960 				rs->sqe_avail++;
1961 				if (!rs_wr_is_msg_send(wc.wr_id))
1962 					rs->sbuf_bytes_avail += sizeof(struct rs_iomap);
1963 				break;
1964 			default:
1965 				rs->sqe_avail++;
1966 				rs->sbuf_bytes_avail += rs_msg_data(rs_wr_data(wc.wr_id));
1967 				break;
1968 			}
1969 			if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) {
1970 				rs->state = rs_error;
1971 				rs->err = EIO;
1972 			}
1973 		}
1974 	}
1975 
1976 	if (rs->state & rs_connected) {
1977 		while (!ret && rcnt--)
1978 			ret = rs_post_recv(rs);
1979 
1980 		if (ret) {
1981 			rs->state = rs_error;
1982 			rs->err = errno;
1983 		}
1984 	}
1985 	return ret;
1986 }
1987 
1988 static int rs_get_cq_event(struct rsocket *rs)
1989 {
1990 	struct ibv_cq *cq;
1991 	void *context;
1992 	int ret;
1993 
1994 	if (!rs->cq_armed)
1995 		return 0;
1996 
1997 	ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
1998 	if (!ret) {
1999 		if (++rs->unack_cqe >= rs->sq_size + rs->rq_size) {
2000 			ibv_ack_cq_events(rs->cm_id->recv_cq, rs->unack_cqe);
2001 			rs->unack_cqe = 0;
2002 		}
2003 		rs->cq_armed = 0;
2004 	} else if (!(errno == EAGAIN || errno == EINTR)) {
2005 		rs->state = rs_error;
2006 	}
2007 
2008 	return ret;
2009 }
2010 
2011 /*
2012  * Although we serialize rsend and rrecv calls with respect to themselves,
2013  * both calls may run simultaneously and need to poll the CQ for completions.
2014  * We need to serialize access to the CQ, but rsend and rrecv need to
2015  * allow each other to make forward progress.
2016  *
2017  * For example, rsend may need to wait for credits from the remote side,
2018  * which could be stalled until the remote process calls rrecv.  This should
2019  * not block rrecv from receiving data from the remote side however.
2020  *
2021  * We handle this by using two locks.  The cq_lock protects against polling
2022  * the CQ and processing completions.  The cq_wait_lock serializes access to
2023  * waiting on the CQ.
2024  */
2025 static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
2026 {
2027 	int ret;
2028 
2029 	fastlock_acquire(&rs->cq_lock);
2030 	do {
2031 		rs_update_credits(rs);
2032 		ret = rs_poll_cq(rs);
2033 		if (test(rs)) {
2034 			ret = 0;
2035 			break;
2036 		} else if (ret) {
2037 			break;
2038 		} else if (nonblock) {
2039 			ret = ERR(EWOULDBLOCK);
2040 		} else if (!rs->cq_armed) {
2041 			ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
2042 			rs->cq_armed = 1;
2043 		} else {
2044 			rs_update_credits(rs);
2045 			fastlock_acquire(&rs->cq_wait_lock);
2046 			fastlock_release(&rs->cq_lock);
2047 
2048 			ret = rs_get_cq_event(rs);
2049 			fastlock_release(&rs->cq_wait_lock);
2050 			fastlock_acquire(&rs->cq_lock);
2051 		}
2052 	} while (!ret);
2053 
2054 	rs_update_credits(rs);
2055 	fastlock_release(&rs->cq_lock);
2056 	return ret;
2057 }
2058 
2059 static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
2060 {
2061 	struct timeval s, e;
2062 	uint32_t poll_time = 0;
2063 	int ret;
2064 
2065 	do {
2066 		ret = rs_process_cq(rs, 1, test);
2067 		if (!ret || nonblock || errno != EWOULDBLOCK)
2068 			return ret;
2069 
2070 		if (!poll_time)
2071 			gettimeofday(&s, NULL);
2072 
2073 		gettimeofday(&e, NULL);
2074 		poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
2075 			    (e.tv_usec - s.tv_usec) + 1;
2076 	} while (poll_time <= polling_time);
2077 
2078 	ret = rs_process_cq(rs, 0, test);
2079 	return ret;
2080 }
2081 
2082 static int ds_valid_recv(struct ds_qp *qp, struct ibv_wc *wc)
2083 {
2084 	struct ds_header *hdr;
2085 
2086 	hdr = (struct ds_header *) (qp->rbuf + rs_wr_data(wc->wr_id));
2087 	return ((wc->byte_len >= sizeof(struct ibv_grh) + DS_IPV4_HDR_LEN) &&
2088 		((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) ||
2089 		 (hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN)));
2090 }
2091 
2092 /*
2093  * Poll all CQs associated with a datagram rsocket.  We need to drop any
2094  * received messages that we do not have room to store.  To limit drops,
2095  * we only poll if we have room to store the receive or we need a send
2096  * buffer.  To ensure fairness, we poll the CQs round robin, remembering
2097  * where we left off.
2098  */
2099 static void ds_poll_cqs(struct rsocket *rs)
2100 {
2101 	struct ds_qp *qp;
2102 	struct ds_smsg *smsg;
2103 	struct ds_rmsg *rmsg;
2104 	struct ibv_wc wc;
2105 	int ret, cnt;
2106 
2107 	if (!(qp = rs->qp_list))
2108 		return;
2109 
2110 	do {
2111 		cnt = 0;
2112 		do {
2113 			ret = ibv_poll_cq(qp->cm_id->recv_cq, 1, &wc);
2114 			if (ret <= 0) {
2115 				qp = ds_next_qp(qp);
2116 				continue;
2117 			}
2118 
2119 			if (rs_wr_is_recv(wc.wr_id)) {
2120 				if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
2121 				    ds_valid_recv(qp, &wc)) {
2122 					rs->rqe_avail--;
2123 					rmsg = &rs->dmsg[rs->rmsg_tail];
2124 					rmsg->qp = qp;
2125 					rmsg->offset = rs_wr_data(wc.wr_id);
2126 					rmsg->length = wc.byte_len - sizeof(struct ibv_grh);
2127 					if (++rs->rmsg_tail == rs->rq_size + 1)
2128 						rs->rmsg_tail = 0;
2129 				} else {
2130 					ds_post_recv(rs, qp, rs_wr_data(wc.wr_id));
2131 				}
2132 			} else {
2133 				smsg = (struct ds_smsg *) (rs->sbuf + rs_wr_data(wc.wr_id));
2134 				smsg->next = rs->smsg_free;
2135 				rs->smsg_free = smsg;
2136 				rs->sqe_avail++;
2137 			}
2138 
2139 			qp = ds_next_qp(qp);
2140 			if (!rs->rqe_avail && rs->sqe_avail) {
2141 				rs->qp_list = qp;
2142 				return;
2143 			}
2144 			cnt++;
2145 		} while (qp != rs->qp_list);
2146 	} while (cnt);
2147 }
2148 
2149 static void ds_req_notify_cqs(struct rsocket *rs)
2150 {
2151 	struct ds_qp *qp;
2152 
2153 	if (!(qp = rs->qp_list))
2154 		return;
2155 
2156 	do {
2157 		if (!qp->cq_armed) {
2158 			ibv_req_notify_cq(qp->cm_id->recv_cq, 0);
2159 			qp->cq_armed = 1;
2160 		}
2161 		qp = ds_next_qp(qp);
2162 	} while (qp != rs->qp_list);
2163 }
2164 
2165 static int ds_get_cq_event(struct rsocket *rs)
2166 {
2167 	struct epoll_event event;
2168 	struct ds_qp *qp;
2169 	struct ibv_cq *cq;
2170 	void *context;
2171 	int ret;
2172 
2173 	if (!rs->cq_armed)
2174 		return 0;
2175 
2176 	ret = epoll_wait(rs->epfd, &event, 1, -1);
2177 	if (ret <= 0)
2178 		return ret;
2179 
2180 	qp = event.data.ptr;
2181 	ret = ibv_get_cq_event(qp->cm_id->recv_cq_channel, &cq, &context);
2182 	if (!ret) {
2183 		ibv_ack_cq_events(qp->cm_id->recv_cq, 1);
2184 		qp->cq_armed = 0;
2185 		rs->cq_armed = 0;
2186 	}
2187 
2188 	return ret;
2189 }
2190 
2191 static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
2192 {
2193 	int ret = 0;
2194 
2195 	fastlock_acquire(&rs->cq_lock);
2196 	do {
2197 		ds_poll_cqs(rs);
2198 		if (test(rs)) {
2199 			ret = 0;
2200 			break;
2201 		} else if (nonblock) {
2202 			ret = ERR(EWOULDBLOCK);
2203 		} else if (!rs->cq_armed) {
2204 			ds_req_notify_cqs(rs);
2205 			rs->cq_armed = 1;
2206 		} else {
2207 			fastlock_acquire(&rs->cq_wait_lock);
2208 			fastlock_release(&rs->cq_lock);
2209 
2210 			ret = ds_get_cq_event(rs);
2211 			fastlock_release(&rs->cq_wait_lock);
2212 			fastlock_acquire(&rs->cq_lock);
2213 		}
2214 	} while (!ret);
2215 
2216 	fastlock_release(&rs->cq_lock);
2217 	return ret;
2218 }
2219 
2220 static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
2221 {
2222 	struct timeval s, e;
2223 	uint32_t poll_time = 0;
2224 	int ret;
2225 
2226 	do {
2227 		ret = ds_process_cqs(rs, 1, test);
2228 		if (!ret || nonblock || errno != EWOULDBLOCK)
2229 			return ret;
2230 
2231 		if (!poll_time)
2232 			gettimeofday(&s, NULL);
2233 
2234 		gettimeofday(&e, NULL);
2235 		poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
2236 			    (e.tv_usec - s.tv_usec) + 1;
2237 	} while (poll_time <= polling_time);
2238 
2239 	ret = ds_process_cqs(rs, 0, test);
2240 	return ret;
2241 }
2242 
2243 static int rs_nonblocking(struct rsocket *rs, int flags)
2244 {
2245 	return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
2246 }
2247 
2248 static int rs_is_cq_armed(struct rsocket *rs)
2249 {
2250 	return rs->cq_armed;
2251 }
2252 
2253 static int rs_poll_all(struct rsocket *rs)
2254 {
2255 	return 1;
2256 }
2257 
2258 /*
2259  * We use hardware flow control to prevent over running the remote
2260  * receive queue.  However, data transfers still require space in
2261  * the remote rmsg queue, or we risk losing notification that data
2262  * has been transfered.
2263  *
2264  * Be careful with race conditions in the check below.  The target SGL
2265  * may be updated by a remote RDMA write.
2266  */
2267 static int rs_can_send(struct rsocket *rs)
2268 {
2269 	if (!(rs->opts & RS_OPT_MSG_SEND)) {
2270 		return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
2271 		       (rs->sseq_no != rs->sseq_comp) &&
2272 		       (rs->target_sgl[rs->target_sge].length != 0);
2273 	} else {
2274 		return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
2275 		       (rs->sseq_no != rs->sseq_comp) &&
2276 		       (rs->target_sgl[rs->target_sge].length != 0);
2277 	}
2278 }
2279 
2280 static int ds_can_send(struct rsocket *rs)
2281 {
2282 	return rs->sqe_avail;
2283 }
2284 
2285 static int ds_all_sends_done(struct rsocket *rs)
2286 {
2287 	return rs->sqe_avail == rs->sq_size;
2288 }
2289 
2290 static int rs_conn_can_send(struct rsocket *rs)
2291 {
2292 	return rs_can_send(rs) || !(rs->state & rs_writable);
2293 }
2294 
2295 static int rs_conn_can_send_ctrl(struct rsocket *rs)
2296 {
2297 	return rs_ctrl_avail(rs) || !(rs->state & rs_connected);
2298 }
2299 
2300 static int rs_have_rdata(struct rsocket *rs)
2301 {
2302 	return (rs->rmsg_head != rs->rmsg_tail);
2303 }
2304 
2305 static int rs_conn_have_rdata(struct rsocket *rs)
2306 {
2307 	return rs_have_rdata(rs) || !(rs->state & rs_readable);
2308 }
2309 
2310 static int rs_conn_all_sends_done(struct rsocket *rs)
2311 {
2312 	return ((((int) rs->ctrl_max_seqno) - ((int) rs->ctrl_seqno)) +
2313 		rs->sqe_avail == rs->sq_size) ||
2314 	       !(rs->state & rs_connected);
2315 }
2316 
2317 static void ds_set_src(struct sockaddr *addr, socklen_t *addrlen,
2318 		       struct ds_header *hdr)
2319 {
2320 	union socket_addr sa;
2321 
2322 	memset(&sa, 0, sizeof sa);
2323 	if (hdr->version == 4) {
2324 		if (*addrlen > sizeof(sa.sin))
2325 			*addrlen = sizeof(sa.sin);
2326 
2327 		sa.sin.sin_family = AF_INET;
2328 		sa.sin.sin_port = hdr->port;
2329 		sa.sin.sin_addr.s_addr =  hdr->addr.ipv4;
2330 	} else {
2331 		if (*addrlen > sizeof(sa.sin6))
2332 			*addrlen = sizeof(sa.sin6);
2333 
2334 		sa.sin6.sin6_family = AF_INET6;
2335 		sa.sin6.sin6_port = hdr->port;
2336 		sa.sin6.sin6_flowinfo = hdr->addr.ipv6.flowinfo;
2337 		memcpy(&sa.sin6.sin6_addr, &hdr->addr.ipv6.addr, 16);
2338 	}
2339 	memcpy(addr, &sa, *addrlen);
2340 }
2341 
2342 static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
2343 			   struct sockaddr *src_addr, socklen_t *addrlen)
2344 {
2345 	struct ds_rmsg *rmsg;
2346 	struct ds_header *hdr;
2347 	int ret;
2348 
2349 	if (!(rs->state & rs_readable))
2350 		return ERR(EINVAL);
2351 
2352 	if (!rs_have_rdata(rs)) {
2353 		ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
2354 				  rs_have_rdata);
2355 		if (ret)
2356 			return ret;
2357 	}
2358 
2359 	rmsg = &rs->dmsg[rs->rmsg_head];
2360 	hdr = (struct ds_header *) (rmsg->qp->rbuf + rmsg->offset);
2361 	if (len > rmsg->length - hdr->length)
2362 		len = rmsg->length - hdr->length;
2363 
2364 	memcpy(buf, (void *) hdr + hdr->length, len);
2365 	if (addrlen)
2366 		ds_set_src(src_addr, addrlen, hdr);
2367 
2368 	if (!(flags & MSG_PEEK)) {
2369 		ds_post_recv(rs, rmsg->qp, rmsg->offset);
2370 		if (++rs->rmsg_head == rs->rq_size + 1)
2371 			rs->rmsg_head = 0;
2372 		rs->rqe_avail++;
2373 	}
2374 
2375 	return len;
2376 }
2377 
2378 static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
2379 {
2380 	size_t left = len;
2381 	uint32_t end_size, rsize;
2382 	int rmsg_head, rbuf_offset;
2383 
2384 	rmsg_head = rs->rmsg_head;
2385 	rbuf_offset = rs->rbuf_offset;
2386 
2387 	for (; left && (rmsg_head != rs->rmsg_tail); left -= rsize) {
2388 		if (left < rs->rmsg[rmsg_head].data) {
2389 			rsize = left;
2390 		} else {
2391 			rsize = rs->rmsg[rmsg_head].data;
2392 			if (++rmsg_head == rs->rq_size + 1)
2393 				rmsg_head = 0;
2394 		}
2395 
2396 		end_size = rs->rbuf_size - rbuf_offset;
2397 		if (rsize > end_size) {
2398 			memcpy(buf, &rs->rbuf[rbuf_offset], end_size);
2399 			rbuf_offset = 0;
2400 			buf += end_size;
2401 			rsize -= end_size;
2402 			left -= end_size;
2403 		}
2404 		memcpy(buf, &rs->rbuf[rbuf_offset], rsize);
2405 		rbuf_offset += rsize;
2406 		buf += rsize;
2407 	}
2408 
2409 	return len - left;
2410 }
2411 
2412 /*
2413  * Continue to receive any queued data even if the remote side has disconnected.
2414  */
2415 ssize_t rrecv(int socket, void *buf, size_t len, int flags)
2416 {
2417 	struct rsocket *rs;
2418 	size_t left = len;
2419 	uint32_t end_size, rsize;
2420 	int ret = 0;
2421 
2422 	rs = idm_at(&idm, socket);
2423 	if (rs->type == SOCK_DGRAM) {
2424 		fastlock_acquire(&rs->rlock);
2425 		ret = ds_recvfrom(rs, buf, len, flags, NULL, NULL);
2426 		fastlock_release(&rs->rlock);
2427 		return ret;
2428 	}
2429 
2430 	if (rs->state & rs_opening) {
2431 		ret = rs_do_connect(rs);
2432 		if (ret) {
2433 			if (errno == EINPROGRESS)
2434 				errno = EAGAIN;
2435 			return ret;
2436 		}
2437 	}
2438 	fastlock_acquire(&rs->rlock);
2439 	do {
2440 		if (!rs_have_rdata(rs)) {
2441 			ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
2442 					  rs_conn_have_rdata);
2443 			if (ret)
2444 				break;
2445 		}
2446 
2447 		if (flags & MSG_PEEK) {
2448 			left = len - rs_peek(rs, buf, left);
2449 			break;
2450 		}
2451 
2452 		for (; left && rs_have_rdata(rs); left -= rsize) {
2453 			if (left < rs->rmsg[rs->rmsg_head].data) {
2454 				rsize = left;
2455 				rs->rmsg[rs->rmsg_head].data -= left;
2456 			} else {
2457 				rs->rseq_no++;
2458 				rsize = rs->rmsg[rs->rmsg_head].data;
2459 				if (++rs->rmsg_head == rs->rq_size + 1)
2460 					rs->rmsg_head = 0;
2461 			}
2462 
2463 			end_size = rs->rbuf_size - rs->rbuf_offset;
2464 			if (rsize > end_size) {
2465 				memcpy(buf, &rs->rbuf[rs->rbuf_offset], end_size);
2466 				rs->rbuf_offset = 0;
2467 				buf += end_size;
2468 				rsize -= end_size;
2469 				left -= end_size;
2470 				rs->rbuf_bytes_avail += end_size;
2471 			}
2472 			memcpy(buf, &rs->rbuf[rs->rbuf_offset], rsize);
2473 			rs->rbuf_offset += rsize;
2474 			buf += rsize;
2475 			rs->rbuf_bytes_avail += rsize;
2476 		}
2477 
2478 	} while (left && (flags & MSG_WAITALL) && (rs->state & rs_readable));
2479 
2480 	fastlock_release(&rs->rlock);
2481 	return (ret && left == len) ? ret : len - left;
2482 }
2483 
2484 ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
2485 		  struct sockaddr *src_addr, socklen_t *addrlen)
2486 {
2487 	struct rsocket *rs;
2488 	int ret;
2489 
2490 	rs = idm_at(&idm, socket);
2491 	if (rs->type == SOCK_DGRAM) {
2492 		fastlock_acquire(&rs->rlock);
2493 		ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen);
2494 		fastlock_release(&rs->rlock);
2495 		return ret;
2496 	}
2497 
2498 	ret = rrecv(socket, buf, len, flags);
2499 	if (ret > 0 && src_addr)
2500 		rgetpeername(socket, src_addr, addrlen);
2501 
2502 	return ret;
2503 }
2504 
2505 /*
2506  * Simple, straightforward implementation for now that only tries to fill
2507  * in the first vector.
2508  */
2509 static ssize_t rrecvv(int socket, const struct iovec *iov, int iovcnt, int flags)
2510 {
2511 	return rrecv(socket, iov[0].iov_base, iov[0].iov_len, flags);
2512 }
2513 
2514 ssize_t rrecvmsg(int socket, struct msghdr *msg, int flags)
2515 {
2516 	if (msg->msg_control && msg->msg_controllen)
2517 		return ERR(ENOTSUP);
2518 
2519 	return rrecvv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags);
2520 }
2521 
2522 ssize_t rread(int socket, void *buf, size_t count)
2523 {
2524 	return rrecv(socket, buf, count, 0);
2525 }
2526 
2527 ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
2528 {
2529 	return rrecvv(socket, iov, iovcnt, 0);
2530 }
2531 
2532 static int rs_send_iomaps(struct rsocket *rs, int flags)
2533 {
2534 	struct rs_iomap_mr *iomr;
2535 	struct ibv_sge sge;
2536 	struct rs_iomap iom;
2537 	int ret;
2538 
2539 	fastlock_acquire(&rs->map_lock);
2540 	while (!dlist_empty(&rs->iomap_queue)) {
2541 		if (!rs_can_send(rs)) {
2542 			ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
2543 					  rs_conn_can_send);
2544 			if (ret)
2545 				break;
2546 			if (!(rs->state & rs_writable)) {
2547 				ret = ERR(ECONNRESET);
2548 				break;
2549 			}
2550 		}
2551 
2552 		iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry);
2553 		if (!(rs->opts & RS_OPT_SWAP_SGL)) {
2554 			iom.offset = iomr->offset;
2555 			iom.sge.addr = (uintptr_t) iomr->mr->addr;
2556 			iom.sge.length = iomr->mr->length;
2557 			iom.sge.key = iomr->mr->rkey;
2558 		} else {
2559 			iom.offset = bswap_64(iomr->offset);
2560 			iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
2561 			iom.sge.length = bswap_32(iomr->mr->length);
2562 			iom.sge.key = bswap_32(iomr->mr->rkey);
2563 		}
2564 
2565 		if (rs->sq_inline >= sizeof iom) {
2566 			sge.addr = (uintptr_t) &iom;
2567 			sge.length = sizeof iom;
2568 			sge.lkey = 0;
2569 			ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE);
2570 		} else if (rs_sbuf_left(rs) >= sizeof iom) {
2571 			memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom);
2572 			rs->ssgl[0].length = sizeof iom;
2573 			ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
2574 			if (rs_sbuf_left(rs) > sizeof iom)
2575 				rs->ssgl[0].addr += sizeof iom;
2576 			else
2577 				rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
2578 		} else {
2579 			rs->ssgl[0].length = rs_sbuf_left(rs);
2580 			memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
2581 				rs->ssgl[0].length);
2582 			rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
2583 			memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
2584 			       rs->ssgl[1].length);
2585 			ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
2586 			rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
2587 		}
2588 		dlist_remove(&iomr->entry);
2589 		dlist_insert_tail(&iomr->entry, &rs->iomap_list);
2590 		if (ret)
2591 			break;
2592 	}
2593 
2594 	rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
2595 	fastlock_release(&rs->map_lock);
2596 	return ret;
2597 }
2598 
2599 static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
2600 			    int iovcnt, int flags, uint8_t op)
2601 {
2602 	struct ds_udp_header hdr;
2603 	struct msghdr msg;
2604 	struct iovec miov[8];
2605 	ssize_t ret;
2606 
2607 	if (iovcnt > 8)
2608 		return ERR(ENOTSUP);
2609 
2610 	hdr.tag = htobe32(DS_UDP_TAG);
2611 	hdr.version = rs->conn_dest->qp->hdr.version;
2612 	hdr.op = op;
2613 	hdr.reserved = 0;
2614 	hdr.qpn = htobe32(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
2615 	if (rs->conn_dest->qp->hdr.version == 4) {
2616 		hdr.length = DS_UDP_IPV4_HDR_LEN;
2617 		hdr.addr.ipv4 = rs->conn_dest->qp->hdr.addr.ipv4;
2618 	} else {
2619 		hdr.length = DS_UDP_IPV6_HDR_LEN;
2620 		memcpy(hdr.addr.ipv6, &rs->conn_dest->qp->hdr.addr.ipv6, 16);
2621 	}
2622 
2623 	miov[0].iov_base = &hdr;
2624 	miov[0].iov_len = hdr.length;
2625 	if (iov && iovcnt)
2626 		memcpy(&miov[1], iov, sizeof(*iov) * iovcnt);
2627 
2628 	memset(&msg, 0, sizeof msg);
2629 	msg.msg_name = &rs->conn_dest->addr;
2630 	msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
2631 	msg.msg_iov = miov;
2632 	msg.msg_iovlen = iovcnt + 1;
2633 	ret = sendmsg(rs->udp_sock, &msg, flags);
2634 	return ret > 0 ? ret - hdr.length : ret;
2635 }
2636 
2637 static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
2638 			   int flags, uint8_t op)
2639 {
2640 	struct iovec iov;
2641 	if (buf && len) {
2642 		iov.iov_base = (void *) buf;
2643 		iov.iov_len = len;
2644 		return ds_sendv_udp(rs, &iov, 1, flags, op);
2645 	} else {
2646 		return ds_sendv_udp(rs, NULL, 0, flags, op);
2647 	}
2648 }
2649 
2650 static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
2651 {
2652 	struct ds_smsg *msg;
2653 	struct ibv_sge sge;
2654 	uint64_t offset;
2655 	int ret = 0;
2656 
2657 	if (!rs->conn_dest->ah)
2658 		return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
2659 
2660 	if (!ds_can_send(rs)) {
2661 		ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
2662 		if (ret)
2663 			return ret;
2664 	}
2665 
2666 	msg = rs->smsg_free;
2667 	rs->smsg_free = msg->next;
2668 	rs->sqe_avail--;
2669 
2670 	memcpy((void *) msg, &rs->conn_dest->qp->hdr, rs->conn_dest->qp->hdr.length);
2671 	memcpy((void *) msg + rs->conn_dest->qp->hdr.length, buf, len);
2672 	sge.addr = (uintptr_t) msg;
2673 	sge.length = rs->conn_dest->qp->hdr.length + len;
2674 	sge.lkey = rs->conn_dest->qp->smr->lkey;
2675 	offset = (uint8_t *) msg - rs->sbuf;
2676 
2677 	ret = ds_post_send(rs, &sge, offset);
2678 	return ret ? ret : len;
2679 }
2680 
2681 /*
2682  * We overlap sending the data, by posting a small work request immediately,
2683  * then increasing the size of the send on each iteration.
2684  */
2685 ssize_t rsend(int socket, const void *buf, size_t len, int flags)
2686 {
2687 	struct rsocket *rs;
2688 	struct ibv_sge sge;
2689 	size_t left = len;
2690 	uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
2691 	int ret = 0;
2692 
2693 	rs = idm_at(&idm, socket);
2694 	if (rs->type == SOCK_DGRAM) {
2695 		fastlock_acquire(&rs->slock);
2696 		ret = dsend(rs, buf, len, flags);
2697 		fastlock_release(&rs->slock);
2698 		return ret;
2699 	}
2700 
2701 	if (rs->state & rs_opening) {
2702 		ret = rs_do_connect(rs);
2703 		if (ret) {
2704 			if (errno == EINPROGRESS)
2705 				errno = EAGAIN;
2706 			return ret;
2707 		}
2708 	}
2709 
2710 	fastlock_acquire(&rs->slock);
2711 	if (rs->iomap_pending) {
2712 		ret = rs_send_iomaps(rs, flags);
2713 		if (ret)
2714 			goto out;
2715 	}
2716 	for (; left; left -= xfer_size, buf += xfer_size) {
2717 		if (!rs_can_send(rs)) {
2718 			ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
2719 					  rs_conn_can_send);
2720 			if (ret)
2721 				break;
2722 			if (!(rs->state & rs_writable)) {
2723 				ret = ERR(ECONNRESET);
2724 				break;
2725 			}
2726 		}
2727 
2728 		if (olen < left) {
2729 			xfer_size = olen;
2730 			if (olen < RS_MAX_TRANSFER)
2731 				olen <<= 1;
2732 		} else {
2733 			xfer_size = left;
2734 		}
2735 
2736 		if (xfer_size > rs->sbuf_bytes_avail)
2737 			xfer_size = rs->sbuf_bytes_avail;
2738 		if (xfer_size > rs->target_sgl[rs->target_sge].length)
2739 			xfer_size = rs->target_sgl[rs->target_sge].length;
2740 
2741 		if (xfer_size <= rs->sq_inline) {
2742 			sge.addr = (uintptr_t) buf;
2743 			sge.length = xfer_size;
2744 			sge.lkey = 0;
2745 			ret = rs_write_data(rs, &sge, 1, xfer_size, IBV_SEND_INLINE);
2746 		} else if (xfer_size <= rs_sbuf_left(rs)) {
2747 			memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
2748 			rs->ssgl[0].length = xfer_size;
2749 			ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, 0);
2750 			if (xfer_size < rs_sbuf_left(rs))
2751 				rs->ssgl[0].addr += xfer_size;
2752 			else
2753 				rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
2754 		} else {
2755 			rs->ssgl[0].length = rs_sbuf_left(rs);
2756 			memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
2757 				rs->ssgl[0].length);
2758 			rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
2759 			memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
2760 			ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, 0);
2761 			rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
2762 		}
2763 		if (ret)
2764 			break;
2765 	}
2766 out:
2767 	fastlock_release(&rs->slock);
2768 
2769 	return (ret && left == len) ? ret : len - left;
2770 }
2771 
2772 ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
2773 		const struct sockaddr *dest_addr, socklen_t addrlen)
2774 {
2775 	struct rsocket *rs;
2776 	int ret;
2777 
2778 	rs = idm_at(&idm, socket);
2779 	if (rs->type == SOCK_STREAM) {
2780 		if (dest_addr || addrlen)
2781 			return ERR(EISCONN);
2782 
2783 		return rsend(socket, buf, len, flags);
2784 	}
2785 
2786 	if (rs->state == rs_init) {
2787 		ret = ds_init_ep(rs);
2788 		if (ret)
2789 			return ret;
2790 	}
2791 
2792 	fastlock_acquire(&rs->slock);
2793 	if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
2794 		ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
2795 		if (ret)
2796 			goto out;
2797 	}
2798 
2799 	ret = dsend(rs, buf, len, flags);
2800 out:
2801 	fastlock_release(&rs->slock);
2802 	return ret;
2803 }
2804 
2805 static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
2806 {
2807 	size_t size;
2808 
2809 	while (len) {
2810 		size = (*iov)->iov_len - *offset;
2811 		if (size > len) {
2812 			memcpy (dst, (*iov)->iov_base + *offset, len);
2813 			*offset += len;
2814 			break;
2815 		}
2816 
2817 		memcpy(dst, (*iov)->iov_base + *offset, size);
2818 		len -= size;
2819 		dst += size;
2820 		(*iov)++;
2821 		*offset = 0;
2822 	}
2823 }
2824 
2825 static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags)
2826 {
2827 	struct rsocket *rs;
2828 	const struct iovec *cur_iov;
2829 	size_t left, len, offset = 0;
2830 	uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
2831 	int i, ret = 0;
2832 
2833 	rs = idm_at(&idm, socket);
2834 	if (rs->state & rs_opening) {
2835 		ret = rs_do_connect(rs);
2836 		if (ret) {
2837 			if (errno == EINPROGRESS)
2838 				errno = EAGAIN;
2839 			return ret;
2840 		}
2841 	}
2842 
2843 	cur_iov = iov;
2844 	len = iov[0].iov_len;
2845 	for (i = 1; i < iovcnt; i++)
2846 		len += iov[i].iov_len;
2847 	left = len;
2848 
2849 	fastlock_acquire(&rs->slock);
2850 	if (rs->iomap_pending) {
2851 		ret = rs_send_iomaps(rs, flags);
2852 		if (ret)
2853 			goto out;
2854 	}
2855 	for (; left; left -= xfer_size) {
2856 		if (!rs_can_send(rs)) {
2857 			ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
2858 					  rs_conn_can_send);
2859 			if (ret)
2860 				break;
2861 			if (!(rs->state & rs_writable)) {
2862 				ret = ERR(ECONNRESET);
2863 				break;
2864 			}
2865 		}
2866 
2867 		if (olen < left) {
2868 			xfer_size = olen;
2869 			if (olen < RS_MAX_TRANSFER)
2870 				olen <<= 1;
2871 		} else {
2872 			xfer_size = left;
2873 		}
2874 
2875 		if (xfer_size > rs->sbuf_bytes_avail)
2876 			xfer_size = rs->sbuf_bytes_avail;
2877 		if (xfer_size > rs->target_sgl[rs->target_sge].length)
2878 			xfer_size = rs->target_sgl[rs->target_sge].length;
2879 
2880 		if (xfer_size <= rs_sbuf_left(rs)) {
2881 			rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr,
2882 				    &cur_iov, &offset, xfer_size);
2883 			rs->ssgl[0].length = xfer_size;
2884 			ret = rs_write_data(rs, rs->ssgl, 1, xfer_size,
2885 					    xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
2886 			if (xfer_size < rs_sbuf_left(rs))
2887 				rs->ssgl[0].addr += xfer_size;
2888 			else
2889 				rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
2890 		} else {
2891 			rs->ssgl[0].length = rs_sbuf_left(rs);
2892 			rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr, &cur_iov,
2893 				    &offset, rs->ssgl[0].length);
2894 			rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
2895 			rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length);
2896 			ret = rs_write_data(rs, rs->ssgl, 2, xfer_size,
2897 					    xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
2898 			rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
2899 		}
2900 		if (ret)
2901 			break;
2902 	}
2903 out:
2904 	fastlock_release(&rs->slock);
2905 
2906 	return (ret && left == len) ? ret : len - left;
2907 }
2908 
2909 ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
2910 {
2911 	if (msg->msg_control && msg->msg_controllen)
2912 		return ERR(ENOTSUP);
2913 
2914 	return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, flags);
2915 }
2916 
2917 ssize_t rwrite(int socket, const void *buf, size_t count)
2918 {
2919 	return rsend(socket, buf, count, 0);
2920 }
2921 
2922 ssize_t rwritev(int socket, const struct iovec *iov, int iovcnt)
2923 {
2924 	return rsendv(socket, iov, iovcnt, 0);
2925 }
2926 
2927 static struct pollfd *rs_fds_alloc(nfds_t nfds)
2928 {
2929 	static __thread struct pollfd *rfds;
2930 	static __thread nfds_t rnfds;
2931 
2932 	if (nfds > rnfds) {
2933 		if (rfds)
2934 			free(rfds);
2935 
2936 		rfds = malloc(sizeof(*rfds) * nfds);
2937 		rnfds = rfds ? nfds : 0;
2938 	}
2939 
2940 	return rfds;
2941 }
2942 
2943 static int rs_poll_rs(struct rsocket *rs, int events,
2944 		      int nonblock, int (*test)(struct rsocket *rs))
2945 {
2946 	struct pollfd fds;
2947 	short revents;
2948 	int ret;
2949 
2950 check_cq:
2951 	if ((rs->type == SOCK_STREAM) && ((rs->state & rs_connected) ||
2952 	     (rs->state == rs_disconnected) || (rs->state & rs_error))) {
2953 		rs_process_cq(rs, nonblock, test);
2954 
2955 		revents = 0;
2956 		if ((events & POLLIN) && rs_conn_have_rdata(rs))
2957 			revents |= POLLIN;
2958 		if ((events & POLLOUT) && rs_can_send(rs))
2959 			revents |= POLLOUT;
2960 		if (!(rs->state & rs_connected)) {
2961 			if (rs->state == rs_disconnected)
2962 				revents |= POLLHUP;
2963 			else
2964 				revents |= POLLERR;
2965 		}
2966 
2967 		return revents;
2968 	} else if (rs->type == SOCK_DGRAM) {
2969 		ds_process_cqs(rs, nonblock, test);
2970 
2971 		revents = 0;
2972 		if ((events & POLLIN) && rs_have_rdata(rs))
2973 			revents |= POLLIN;
2974 		if ((events & POLLOUT) && ds_can_send(rs))
2975 			revents |= POLLOUT;
2976 
2977 		return revents;
2978 	}
2979 
2980 	if (rs->state == rs_listening) {
2981 		fds.fd = rs->cm_id->channel->fd;
2982 		fds.events = events;
2983 		fds.revents = 0;
2984 		poll(&fds, 1, 0);
2985 		return fds.revents;
2986 	}
2987 
2988 	if (rs->state & rs_opening) {
2989 		ret = rs_do_connect(rs);
2990 		if (ret && (errno == EINPROGRESS)) {
2991 			errno = 0;
2992 		} else {
2993 			goto check_cq;
2994 		}
2995 	}
2996 
2997 	if (rs->state == rs_connect_error) {
2998 		revents = 0;
2999 		if (events & POLLOUT)
3000 			revents |= POLLOUT;
3001 		if (events & POLLIN)
3002 			revents |= POLLIN;
3003 		revents |= POLLERR;
3004 		return revents;
3005 	}
3006 
3007 	return 0;
3008 }
3009 
3010 static int rs_poll_check(struct pollfd *fds, nfds_t nfds)
3011 {
3012 	struct rsocket *rs;
3013 	int i, cnt = 0;
3014 
3015 	for (i = 0; i < nfds; i++) {
3016 		rs = idm_lookup(&idm, fds[i].fd);
3017 		if (rs)
3018 			fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
3019 		else
3020 			poll(&fds[i], 1, 0);
3021 
3022 		if (fds[i].revents)
3023 			cnt++;
3024 	}
3025 	return cnt;
3026 }
3027 
3028 static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
3029 {
3030 	struct rsocket *rs;
3031 	int i;
3032 
3033 	for (i = 0; i < nfds; i++) {
3034 		rs = idm_lookup(&idm, fds[i].fd);
3035 		if (rs) {
3036 			fds[i].revents = rs_poll_rs(rs, fds[i].events, 0, rs_is_cq_armed);
3037 			if (fds[i].revents)
3038 				return 1;
3039 
3040 			if (rs->type == SOCK_STREAM) {
3041 				if (rs->state >= rs_connected)
3042 					rfds[i].fd = rs->cm_id->recv_cq_channel->fd;
3043 				else
3044 					rfds[i].fd = rs->cm_id->channel->fd;
3045 			} else {
3046 				rfds[i].fd = rs->epfd;
3047 			}
3048 			rfds[i].events = POLLIN;
3049 		} else {
3050 			rfds[i].fd = fds[i].fd;
3051 			rfds[i].events = fds[i].events;
3052 		}
3053 		rfds[i].revents = 0;
3054 	}
3055 	return 0;
3056 }
3057 
3058 static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
3059 {
3060 	struct rsocket *rs;
3061 	int i, cnt = 0;
3062 
3063 	for (i = 0; i < nfds; i++) {
3064 		if (!rfds[i].revents)
3065 			continue;
3066 
3067 		rs = idm_lookup(&idm, fds[i].fd);
3068 		if (rs) {
3069 			fastlock_acquire(&rs->cq_wait_lock);
3070 			if (rs->type == SOCK_STREAM)
3071 				rs_get_cq_event(rs);
3072 			else
3073 				ds_get_cq_event(rs);
3074 			fastlock_release(&rs->cq_wait_lock);
3075 			fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
3076 		} else {
3077 			fds[i].revents = rfds[i].revents;
3078 		}
3079 		if (fds[i].revents)
3080 			cnt++;
3081 	}
3082 	return cnt;
3083 }
3084 
3085 /*
3086  * We need to poll *all* fd's that the user specifies at least once.
3087  * Note that we may receive events on an rsocket that may not be reported
3088  * to the user (e.g. connection events or credit updates).  Process those
3089  * events, then return to polling until we find ones of interest.
3090  */
3091 int rpoll(struct pollfd *fds, nfds_t nfds, int timeout)
3092 {
3093 	struct timeval s, e;
3094 	struct pollfd *rfds;
3095 	uint32_t poll_time = 0;
3096 	int ret;
3097 
3098 	do {
3099 		ret = rs_poll_check(fds, nfds);
3100 		if (ret || !timeout)
3101 			return ret;
3102 
3103 		if (!poll_time)
3104 			gettimeofday(&s, NULL);
3105 
3106 		gettimeofday(&e, NULL);
3107 		poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
3108 			    (e.tv_usec - s.tv_usec) + 1;
3109 	} while (poll_time <= polling_time);
3110 
3111 	rfds = rs_fds_alloc(nfds);
3112 	if (!rfds)
3113 		return ERR(ENOMEM);
3114 
3115 	do {
3116 		ret = rs_poll_arm(rfds, fds, nfds);
3117 		if (ret)
3118 			break;
3119 
3120 		ret = poll(rfds, nfds, timeout);
3121 		if (ret <= 0)
3122 			break;
3123 
3124 		ret = rs_poll_events(rfds, fds, nfds);
3125 	} while (!ret);
3126 
3127 	return ret;
3128 }
3129 
3130 static struct pollfd *
3131 rs_select_to_poll(int *nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds)
3132 {
3133 	struct pollfd *fds;
3134 	int fd, i = 0;
3135 
3136 	fds = calloc(*nfds, sizeof(*fds));
3137 	if (!fds)
3138 		return NULL;
3139 
3140 	for (fd = 0; fd < *nfds; fd++) {
3141 		if (readfds && FD_ISSET(fd, readfds)) {
3142 			fds[i].fd = fd;
3143 			fds[i].events = POLLIN;
3144 		}
3145 
3146 		if (writefds && FD_ISSET(fd, writefds)) {
3147 			fds[i].fd = fd;
3148 			fds[i].events |= POLLOUT;
3149 		}
3150 
3151 		if (exceptfds && FD_ISSET(fd, exceptfds))
3152 			fds[i].fd = fd;
3153 
3154 		if (fds[i].fd)
3155 			i++;
3156 	}
3157 
3158 	*nfds = i;
3159 	return fds;
3160 }
3161 
3162 static int
3163 rs_poll_to_select(int nfds, struct pollfd *fds, fd_set *readfds,
3164 		  fd_set *writefds, fd_set *exceptfds)
3165 {
3166 	int i, cnt = 0;
3167 
3168 	for (i = 0; i < nfds; i++) {
3169 		if (readfds && (fds[i].revents & (POLLIN | POLLHUP))) {
3170 			FD_SET(fds[i].fd, readfds);
3171 			cnt++;
3172 		}
3173 
3174 		if (writefds && (fds[i].revents & POLLOUT)) {
3175 			FD_SET(fds[i].fd, writefds);
3176 			cnt++;
3177 		}
3178 
3179 		if (exceptfds && (fds[i].revents & ~(POLLIN | POLLOUT))) {
3180 			FD_SET(fds[i].fd, exceptfds);
3181 			cnt++;
3182 		}
3183 	}
3184 	return cnt;
3185 }
3186 
3187 static int rs_convert_timeout(struct timeval *timeout)
3188 {
3189 	return !timeout ? -1 :
3190 		timeout->tv_sec * 1000 + timeout->tv_usec / 1000;
3191 }
3192 
3193 int rselect(int nfds, fd_set *readfds, fd_set *writefds,
3194 	    fd_set *exceptfds, struct timeval *timeout)
3195 {
3196 	struct pollfd *fds;
3197 	int ret;
3198 
3199 	fds = rs_select_to_poll(&nfds, readfds, writefds, exceptfds);
3200 	if (!fds)
3201 		return ERR(ENOMEM);
3202 
3203 	ret = rpoll(fds, nfds, rs_convert_timeout(timeout));
3204 
3205 	if (readfds)
3206 		FD_ZERO(readfds);
3207 	if (writefds)
3208 		FD_ZERO(writefds);
3209 	if (exceptfds)
3210 		FD_ZERO(exceptfds);
3211 
3212 	if (ret > 0)
3213 		ret = rs_poll_to_select(nfds, fds, readfds, writefds, exceptfds);
3214 
3215 	free(fds);
3216 	return ret;
3217 }
3218 
3219 /*
3220  * For graceful disconnect, notify the remote side that we're
3221  * disconnecting and wait until all outstanding sends complete, provided
3222  * that the remote side has not sent a disconnect message.
3223  */
3224 int rshutdown(int socket, int how)
3225 {
3226 	struct rsocket *rs;
3227 	int ctrl, ret = 0;
3228 
3229 	rs = idm_lookup(&idm, socket);
3230 	if (!rs)
3231 		return ERR(EBADF);
3232 	if (rs->opts & RS_OPT_SVC_ACTIVE)
3233 		rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
3234 
3235 	if (rs->fd_flags & O_NONBLOCK)
3236 		rs_set_nonblocking(rs, 0);
3237 
3238 	if (rs->state & rs_connected) {
3239 		if (how == SHUT_RDWR) {
3240 			ctrl = RS_CTRL_DISCONNECT;
3241 			rs->state &= ~(rs_readable | rs_writable);
3242 		} else if (how == SHUT_WR) {
3243 			rs->state &= ~rs_writable;
3244 			ctrl = (rs->state & rs_readable) ?
3245 				RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
3246 		} else {
3247 			rs->state &= ~rs_readable;
3248 			if (rs->state & rs_writable)
3249 				goto out;
3250 			ctrl = RS_CTRL_DISCONNECT;
3251 		}
3252 		if (!rs_ctrl_avail(rs)) {
3253 			ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl);
3254 			if (ret)
3255 				goto out;
3256 		}
3257 
3258 		if ((rs->state & rs_connected) && rs_ctrl_avail(rs)) {
3259 			rs->ctrl_seqno++;
3260 			ret = rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, ctrl));
3261 		}
3262 	}
3263 
3264 	if (rs->state & rs_connected)
3265 		rs_process_cq(rs, 0, rs_conn_all_sends_done);
3266 
3267 out:
3268 	if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
3269 		rs_set_nonblocking(rs, rs->fd_flags);
3270 
3271 	if (rs->state & rs_disconnected) {
3272 		/* Generate event by flushing receives to unblock rpoll */
3273 		ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
3274 		ucma_shutdown(rs->cm_id);
3275 	}
3276 
3277 	return ret;
3278 }
3279 
3280 static void ds_shutdown(struct rsocket *rs)
3281 {
3282 	if (rs->opts & RS_OPT_SVC_ACTIVE)
3283 		rs_notify_svc(&udp_svc, rs, RS_SVC_REM_DGRAM);
3284 
3285 	if (rs->fd_flags & O_NONBLOCK)
3286 		rs_set_nonblocking(rs, 0);
3287 
3288 	rs->state &= ~(rs_readable | rs_writable);
3289 	ds_process_cqs(rs, 0, ds_all_sends_done);
3290 
3291 	if (rs->fd_flags & O_NONBLOCK)
3292 		rs_set_nonblocking(rs, rs->fd_flags);
3293 }
3294 
3295 int rclose(int socket)
3296 {
3297 	struct rsocket *rs;
3298 
3299 	rs = idm_lookup(&idm, socket);
3300 	if (!rs)
3301 		return EBADF;
3302 	if (rs->type == SOCK_STREAM) {
3303 		if (rs->state & rs_connected)
3304 			rshutdown(socket, SHUT_RDWR);
3305 		else if (rs->opts & RS_OPT_SVC_ACTIVE)
3306 			rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
3307 	} else {
3308 		ds_shutdown(rs);
3309 	}
3310 
3311 	rs_free(rs);
3312 	return 0;
3313 }
3314 
3315 static void rs_copy_addr(struct sockaddr *dst, struct sockaddr *src, socklen_t *len)
3316 {
3317 	socklen_t size;
3318 
3319 	if (src->sa_family == AF_INET) {
3320 		size = min_t(socklen_t, *len, sizeof(struct sockaddr_in));
3321 		*len = sizeof(struct sockaddr_in);
3322 	} else {
3323 		size = min_t(socklen_t, *len, sizeof(struct sockaddr_in6));
3324 		*len = sizeof(struct sockaddr_in6);
3325 	}
3326 	memcpy(dst, src, size);
3327 }
3328 
3329 int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
3330 {
3331 	struct rsocket *rs;
3332 
3333 	rs = idm_lookup(&idm, socket);
3334 	if (!rs)
3335 		return ERR(EBADF);
3336 	if (rs->type == SOCK_STREAM) {
3337 		rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
3338 		return 0;
3339 	} else {
3340 		return getpeername(rs->udp_sock, addr, addrlen);
3341 	}
3342 }
3343 
3344 int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
3345 {
3346 	struct rsocket *rs;
3347 
3348 	rs = idm_lookup(&idm, socket);
3349 	if (!rs)
3350 		return ERR(EBADF);
3351 	if (rs->type == SOCK_STREAM) {
3352 		rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
3353 		return 0;
3354 	} else {
3355 		return getsockname(rs->udp_sock, addr, addrlen);
3356 	}
3357 }
3358 
3359 static int rs_set_keepalive(struct rsocket *rs, int on)
3360 {
3361 	FILE *f;
3362 	int ret;
3363 
3364 	if ((on && (rs->opts & RS_OPT_SVC_ACTIVE)) ||
3365 	    (!on && !(rs->opts & RS_OPT_SVC_ACTIVE)))
3366 		return 0;
3367 
3368 	if (on) {
3369 		if (!rs->keepalive_time) {
3370 			if ((f = fopen("/proc/sys/net/ipv4/tcp_keepalive_time", "r"))) {
3371 				if (fscanf(f, "%u", &rs->keepalive_time) != 1)
3372 					rs->keepalive_time = 7200;
3373 				fclose(f);
3374 			} else {
3375 				rs->keepalive_time = 7200;
3376 			}
3377 		}
3378 		ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_ADD_KEEPALIVE);
3379 	} else {
3380 		ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
3381 	}
3382 
3383 	return ret;
3384 }
3385 
3386 int rsetsockopt(int socket, int level, int optname,
3387 		const void *optval, socklen_t optlen)
3388 {
3389 	struct rsocket *rs;
3390 	int ret, opt_on = 0;
3391 	uint64_t *opts = NULL;
3392 
3393 	ret = ERR(ENOTSUP);
3394 	rs = idm_lookup(&idm, socket);
3395 	if (!rs)
3396 		return ERR(EBADF);
3397 	if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
3398 		ret = setsockopt(rs->udp_sock, level, optname, optval, optlen);
3399 		if (ret)
3400 			return ret;
3401 	}
3402 
3403 	switch (level) {
3404 	case SOL_SOCKET:
3405 		opts = &rs->so_opts;
3406 		switch (optname) {
3407 		case SO_REUSEADDR:
3408 			if (rs->type == SOCK_STREAM) {
3409 				ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
3410 						      RDMA_OPTION_ID_REUSEADDR,
3411 						      (void *) optval, optlen);
3412 				if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
3413 				    rs->cm_id->context &&
3414 				    (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
3415 					ret = 0;
3416 			}
3417 			opt_on = *(int *) optval;
3418 			break;
3419 		case SO_RCVBUF:
3420 			if ((rs->type == SOCK_STREAM && !rs->rbuf) ||
3421 			    (rs->type == SOCK_DGRAM && !rs->qp_list))
3422 				rs->rbuf_size = (*(uint32_t *) optval) << 1;
3423 			ret = 0;
3424 			break;
3425 		case SO_SNDBUF:
3426 			if (!rs->sbuf)
3427 				rs->sbuf_size = (*(uint32_t *) optval) << 1;
3428 			if (rs->sbuf_size < RS_SNDLOWAT)
3429 				rs->sbuf_size = RS_SNDLOWAT << 1;
3430 			ret = 0;
3431 			break;
3432 		case SO_LINGER:
3433 			/* Invert value so default so_opt = 0 is on */
3434 			opt_on =  !((struct linger *) optval)->l_onoff;
3435 			ret = 0;
3436 			break;
3437 		case SO_KEEPALIVE:
3438 			ret = rs_set_keepalive(rs, *(int *) optval);
3439 			opt_on = rs->opts & RS_OPT_SVC_ACTIVE;
3440 			break;
3441 		case SO_OOBINLINE:
3442 			opt_on = *(int *) optval;
3443 			ret = 0;
3444 			break;
3445 		default:
3446 			break;
3447 		}
3448 		break;
3449 	case IPPROTO_TCP:
3450 		opts = &rs->tcp_opts;
3451 		switch (optname) {
3452 		case TCP_KEEPCNT:
3453 		case TCP_KEEPINTVL:
3454 			ret = 0;   /* N/A - we're using a reliable connection */
3455 			break;
3456 		case TCP_KEEPIDLE:
3457 			if (*(int *) optval <= 0) {
3458 				ret = ERR(EINVAL);
3459 				break;
3460 			}
3461 			rs->keepalive_time = *(int *) optval;
3462 			ret = (rs->opts & RS_OPT_SVC_ACTIVE) ?
3463 			      rs_notify_svc(&tcp_svc, rs, RS_SVC_MOD_KEEPALIVE) : 0;
3464 			break;
3465 		case TCP_NODELAY:
3466 			opt_on = *(int *) optval;
3467 			ret = 0;
3468 			break;
3469 		case TCP_MAXSEG:
3470 			ret = 0;
3471 			break;
3472 		default:
3473 			break;
3474 		}
3475 		break;
3476 	case IPPROTO_IPV6:
3477 		opts = &rs->ipv6_opts;
3478 		switch (optname) {
3479 		case IPV6_V6ONLY:
3480 			if (rs->type == SOCK_STREAM) {
3481 				ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
3482 						      RDMA_OPTION_ID_AFONLY,
3483 						      (void *) optval, optlen);
3484 			}
3485 			opt_on = *(int *) optval;
3486 			break;
3487 		default:
3488 			break;
3489 		}
3490 		break;
3491 	case SOL_RDMA:
3492 		if (rs->state >= rs_opening) {
3493 			ret = ERR(EINVAL);
3494 			break;
3495 		}
3496 
3497 		switch (optname) {
3498 		case RDMA_SQSIZE:
3499 			rs->sq_size = min_t(uint32_t, (*(uint32_t *)optval),
3500 					    RS_QP_MAX_SIZE);
3501 			ret = 0;
3502 			break;
3503 		case RDMA_RQSIZE:
3504 			rs->rq_size = min_t(uint32_t, (*(uint32_t *)optval),
3505 					    RS_QP_MAX_SIZE);
3506 			ret = 0;
3507 			break;
3508 		case RDMA_INLINE:
3509 			rs->sq_inline = min_t(uint32_t, *(uint32_t *)optval,
3510 					      RS_QP_MAX_SIZE);
3511 			ret = 0;
3512 			break;
3513 		case RDMA_IOMAPSIZE:
3514 			rs->target_iomap_size = (uint16_t) rs_scale_to_value(
3515 				(uint8_t) rs_value_to_scale(*(int *) optval, 8), 8);
3516 			ret = 0;
3517 			break;
3518 		case RDMA_ROUTE:
3519 			if ((rs->optval = malloc(optlen))) {
3520 				memcpy(rs->optval, optval, optlen);
3521 				rs->optlen = optlen;
3522 				ret = 0;
3523 			} else {
3524 				ret = ERR(ENOMEM);
3525 			}
3526 			break;
3527 		default:
3528 			break;
3529 		}
3530 		break;
3531 	default:
3532 		break;
3533 	}
3534 
3535 	if (!ret && opts) {
3536 		if (opt_on)
3537 			*opts |= (1 << optname);
3538 		else
3539 			*opts &= ~(1 << optname);
3540 	}
3541 
3542 	return ret;
3543 }
3544 
3545 static void rs_convert_sa_path(struct ibv_sa_path_rec *sa_path,
3546 			       struct ibv_path_data *path_data)
3547 {
3548 	uint32_t fl_hop;
3549 
3550 	memset(path_data, 0, sizeof(*path_data));
3551 	path_data->path.dgid = sa_path->dgid;
3552 	path_data->path.sgid = sa_path->sgid;
3553 	path_data->path.dlid = sa_path->dlid;
3554 	path_data->path.slid = sa_path->slid;
3555 	fl_hop = be32toh(sa_path->flow_label) << 8;
3556 	path_data->path.flowlabel_hoplimit = htobe32(fl_hop | sa_path->hop_limit);
3557 	path_data->path.tclass = sa_path->traffic_class;
3558 	path_data->path.reversible_numpath = sa_path->reversible << 7 | 1;
3559 	path_data->path.pkey = sa_path->pkey;
3560 	path_data->path.qosclass_sl = htobe16(sa_path->sl);
3561 	path_data->path.mtu = sa_path->mtu | 2 << 6;	/* exactly */
3562 	path_data->path.rate = sa_path->rate | 2 << 6;
3563 	path_data->path.packetlifetime = sa_path->packet_life_time | 2 << 6;
3564 	path_data->flags= sa_path->preference;
3565 }
3566 
3567 int rgetsockopt(int socket, int level, int optname,
3568 		void *optval, socklen_t *optlen)
3569 {
3570 	struct rsocket *rs;
3571 	void *opt;
3572 	struct ibv_sa_path_rec *path_rec;
3573 	struct ibv_path_data path_data;
3574 	socklen_t len;
3575 	int ret = 0;
3576 	int num_paths;
3577 
3578 	rs = idm_lookup(&idm, socket);
3579 	if (!rs)
3580 		return ERR(EBADF);
3581 	switch (level) {
3582 	case SOL_SOCKET:
3583 		switch (optname) {
3584 		case SO_REUSEADDR:
3585 		case SO_KEEPALIVE:
3586 		case SO_OOBINLINE:
3587 			*((int *) optval) = !!(rs->so_opts & (1 << optname));
3588 			*optlen = sizeof(int);
3589 			break;
3590 		case SO_RCVBUF:
3591 			*((int *) optval) = rs->rbuf_size;
3592 			*optlen = sizeof(int);
3593 			break;
3594 		case SO_SNDBUF:
3595 			*((int *) optval) = rs->sbuf_size;
3596 			*optlen = sizeof(int);
3597 			break;
3598 		case SO_LINGER:
3599 			/* Value is inverted so default so_opt = 0 is on */
3600 			((struct linger *) optval)->l_onoff =
3601 					!(rs->so_opts & (1 << optname));
3602 			((struct linger *) optval)->l_linger = 0;
3603 			*optlen = sizeof(struct linger);
3604 			break;
3605 		case SO_ERROR:
3606 			*((int *) optval) = rs->err;
3607 			*optlen = sizeof(int);
3608 			rs->err = 0;
3609 			break;
3610 		default:
3611 			ret = ENOTSUP;
3612 			break;
3613 		}
3614 		break;
3615 	case IPPROTO_TCP:
3616 		switch (optname) {
3617 		case TCP_KEEPCNT:
3618 		case TCP_KEEPINTVL:
3619 			*((int *) optval) = 1;   /* N/A */
3620 			break;
3621 		case TCP_KEEPIDLE:
3622 			*((int *) optval) = (int) rs->keepalive_time;
3623 			*optlen = sizeof(int);
3624 			break;
3625 		case TCP_NODELAY:
3626 			*((int *) optval) = !!(rs->tcp_opts & (1 << optname));
3627 			*optlen = sizeof(int);
3628 			break;
3629 		case TCP_MAXSEG:
3630 			*((int *) optval) = (rs->cm_id && rs->cm_id->route.num_paths) ?
3631 					    1 << (7 + rs->cm_id->route.path_rec->mtu) :
3632 					    2048;
3633 			*optlen = sizeof(int);
3634 			break;
3635 		default:
3636 			ret = ENOTSUP;
3637 			break;
3638 		}
3639 		break;
3640 	case IPPROTO_IPV6:
3641 		switch (optname) {
3642 		case IPV6_V6ONLY:
3643 			*((int *) optval) = !!(rs->ipv6_opts & (1 << optname));
3644 			*optlen = sizeof(int);
3645 			break;
3646 		default:
3647 			ret = ENOTSUP;
3648 			break;
3649 		}
3650 		break;
3651 	case SOL_RDMA:
3652 		switch (optname) {
3653 		case RDMA_SQSIZE:
3654 			*((int *) optval) = rs->sq_size;
3655 			*optlen = sizeof(int);
3656 			break;
3657 		case RDMA_RQSIZE:
3658 			*((int *) optval) = rs->rq_size;
3659 			*optlen = sizeof(int);
3660 			break;
3661 		case RDMA_INLINE:
3662 			*((int *) optval) = rs->sq_inline;
3663 			*optlen = sizeof(int);
3664 			break;
3665 		case RDMA_IOMAPSIZE:
3666 			*((int *) optval) = rs->target_iomap_size;
3667 			*optlen = sizeof(int);
3668 			break;
3669 		case RDMA_ROUTE:
3670 			if (rs->optval) {
3671 				if (*optlen < rs->optlen) {
3672 					ret = EINVAL;
3673 				} else {
3674 					memcpy(rs->optval, optval, rs->optlen);
3675 					*optlen = rs->optlen;
3676 				}
3677 			} else {
3678 				if (*optlen < sizeof(path_data)) {
3679 					ret = EINVAL;
3680 				} else {
3681 					len = 0;
3682 					opt = optval;
3683 					path_rec = rs->cm_id->route.path_rec;
3684 					num_paths = 0;
3685 					while (len + sizeof(path_data) <= *optlen &&
3686 					       num_paths < rs->cm_id->route.num_paths) {
3687 						rs_convert_sa_path(path_rec, &path_data);
3688 						memcpy(opt, &path_data, sizeof(path_data));
3689 						len += sizeof(path_data);
3690 						opt += sizeof(path_data);
3691 						path_rec++;
3692 						num_paths++;
3693 					}
3694 					*optlen = len;
3695 					ret = 0;
3696 				}
3697 			}
3698 			break;
3699 		default:
3700 			ret = ENOTSUP;
3701 			break;
3702 		}
3703 		break;
3704 	default:
3705 		ret = ENOTSUP;
3706 		break;
3707 	}
3708 
3709 	return rdma_seterrno(ret);
3710 }
3711 
3712 int rfcntl(int socket, int cmd, ... /* arg */ )
3713 {
3714 	struct rsocket *rs;
3715 	va_list args;
3716 	int param;
3717 	int ret = 0;
3718 
3719 	rs = idm_lookup(&idm, socket);
3720 	if (!rs)
3721 		return ERR(EBADF);
3722 	va_start(args, cmd);
3723 	switch (cmd) {
3724 	case F_GETFL:
3725 		ret = rs->fd_flags;
3726 		break;
3727 	case F_SETFL:
3728 		param = va_arg(args, int);
3729 		if ((rs->fd_flags & O_NONBLOCK) != (param & O_NONBLOCK))
3730 			ret = rs_set_nonblocking(rs, param & O_NONBLOCK);
3731 
3732 		if (!ret)
3733 			rs->fd_flags = param;
3734 		break;
3735 	default:
3736 		ret = ERR(ENOTSUP);
3737 		break;
3738 	}
3739 	va_end(args);
3740 	return ret;
3741 }
3742 
3743 static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs)
3744 {
3745 	int i;
3746 
3747 	if (!rs->remote_iomappings) {
3748 		rs->remote_iomappings = calloc(rs->remote_iomap.length,
3749 					       sizeof(*rs->remote_iomappings));
3750 		if (!rs->remote_iomappings)
3751 			return NULL;
3752 
3753 		for (i = 0; i < rs->remote_iomap.length; i++)
3754 			rs->remote_iomappings[i].index = i;
3755 	}
3756 
3757 	for (i = 0; i < rs->remote_iomap.length; i++) {
3758 		if (!rs->remote_iomappings[i].mr)
3759 			return &rs->remote_iomappings[i];
3760 	}
3761 	return NULL;
3762 }
3763 
3764 /*
3765  * If an offset is given, we map to it.  If offset is -1, then we map the
3766  * offset to the address of buf.  We do not check for conflicts, which must
3767  * be fixed at some point.
3768  */
3769 off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset)
3770 {
3771 	struct rsocket *rs;
3772 	struct rs_iomap_mr *iomr;
3773 	int access = IBV_ACCESS_LOCAL_WRITE;
3774 
3775 	rs = idm_at(&idm, socket);
3776 	if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
3777 		return ERR(EINVAL);
3778 
3779 	fastlock_acquire(&rs->map_lock);
3780 	if (prot & PROT_WRITE) {
3781 		iomr = rs_get_iomap_mr(rs);
3782 		access |= IBV_ACCESS_REMOTE_WRITE;
3783 	} else {
3784 		iomr = calloc(1, sizeof(*iomr));
3785 		iomr->index = -1;
3786 	}
3787 	if (!iomr) {
3788 		offset = ERR(ENOMEM);
3789 		goto out;
3790 	}
3791 
3792 	iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access);
3793 	if (!iomr->mr) {
3794 		if (iomr->index < 0)
3795 			free(iomr);
3796 		offset = -1;
3797 		goto out;
3798 	}
3799 
3800 	if (offset == -1)
3801 		offset = (uintptr_t) buf;
3802 	iomr->offset = offset;
3803 	atomic_store(&iomr->refcnt, 1);
3804 
3805 	if (iomr->index >= 0) {
3806 		dlist_insert_tail(&iomr->entry, &rs->iomap_queue);
3807 		rs->iomap_pending = 1;
3808 	} else {
3809 		dlist_insert_tail(&iomr->entry, &rs->iomap_list);
3810 	}
3811 out:
3812 	fastlock_release(&rs->map_lock);
3813 	return offset;
3814 }
3815 
3816 int riounmap(int socket, void *buf, size_t len)
3817 {
3818 	struct rsocket *rs;
3819 	struct rs_iomap_mr *iomr;
3820 	dlist_entry *entry;
3821 	int ret = 0;
3822 
3823 	rs = idm_at(&idm, socket);
3824 	fastlock_acquire(&rs->map_lock);
3825 
3826 	for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
3827 	     entry = entry->next) {
3828 		iomr = container_of(entry, struct rs_iomap_mr, entry);
3829 		if (iomr->mr->addr == buf && iomr->mr->length == len) {
3830 			rs_release_iomap_mr(iomr);
3831 			goto out;
3832 		}
3833 	}
3834 
3835 	for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue;
3836 	     entry = entry->next) {
3837 		iomr = container_of(entry, struct rs_iomap_mr, entry);
3838 		if (iomr->mr->addr == buf && iomr->mr->length == len) {
3839 			rs_release_iomap_mr(iomr);
3840 			goto out;
3841 		}
3842 	}
3843 	ret = ERR(EINVAL);
3844 out:
3845 	fastlock_release(&rs->map_lock);
3846 	return ret;
3847 }
3848 
3849 static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset)
3850 {
3851 	int i;
3852 
3853 	for (i = 0; i < rs->target_iomap_size; i++) {
3854 		if (offset >= rs->target_iomap[i].offset &&
3855 		    offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length)
3856 			return &rs->target_iomap[i];
3857 	}
3858 	return NULL;
3859 }
3860 
3861 size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
3862 {
3863 	struct rsocket *rs;
3864 	struct rs_iomap *iom = NULL;
3865 	struct ibv_sge sge;
3866 	size_t left = count;
3867 	uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
3868 	int ret = 0;
3869 
3870 	rs = idm_at(&idm, socket);
3871 	fastlock_acquire(&rs->slock);
3872 	if (rs->iomap_pending) {
3873 		ret = rs_send_iomaps(rs, flags);
3874 		if (ret)
3875 			goto out;
3876 	}
3877 	for (; left; left -= xfer_size, buf += xfer_size, offset += xfer_size) {
3878 		if (!iom || offset > iom->offset + iom->sge.length) {
3879 			iom = rs_find_iomap(rs, offset);
3880 			if (!iom)
3881 				break;
3882 		}
3883 
3884 		if (!rs_can_send(rs)) {
3885 			ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
3886 					  rs_conn_can_send);
3887 			if (ret)
3888 				break;
3889 			if (!(rs->state & rs_writable)) {
3890 				ret = ERR(ECONNRESET);
3891 				break;
3892 			}
3893 		}
3894 
3895 		if (olen < left) {
3896 			xfer_size = olen;
3897 			if (olen < RS_MAX_TRANSFER)
3898 				olen <<= 1;
3899 		} else {
3900 			xfer_size = left;
3901 		}
3902 
3903 		if (xfer_size > rs->sbuf_bytes_avail)
3904 			xfer_size = rs->sbuf_bytes_avail;
3905 		if (xfer_size > iom->offset + iom->sge.length - offset)
3906 			xfer_size = iom->offset + iom->sge.length - offset;
3907 
3908 		if (xfer_size <= rs->sq_inline) {
3909 			sge.addr = (uintptr_t) buf;
3910 			sge.length = xfer_size;
3911 			sge.lkey = 0;
3912 			ret = rs_write_direct(rs, iom, offset, &sge, 1,
3913 					      xfer_size, IBV_SEND_INLINE);
3914 		} else if (xfer_size <= rs_sbuf_left(rs)) {
3915 			memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
3916 			rs->ssgl[0].length = xfer_size;
3917 			ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0);
3918 			if (xfer_size < rs_sbuf_left(rs))
3919 				rs->ssgl[0].addr += xfer_size;
3920 			else
3921 				rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
3922 		} else {
3923 			rs->ssgl[0].length = rs_sbuf_left(rs);
3924 			memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
3925 				rs->ssgl[0].length);
3926 			rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
3927 			memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
3928 			ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0);
3929 			rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
3930 		}
3931 		if (ret)
3932 			break;
3933 	}
3934 out:
3935 	fastlock_release(&rs->slock);
3936 
3937 	return (ret && left == count) ? ret : count - left;
3938 }
3939 
3940 /****************************************************************************
3941  * Service Processing Threads
3942  ****************************************************************************/
3943 
3944 static int rs_svc_grow_sets(struct rs_svc *svc, int grow_size)
3945 {
3946 	struct rsocket **rss;
3947 	void *set, *contexts;
3948 
3949 	set = calloc(svc->size + grow_size, sizeof(*rss) + svc->context_size);
3950 	if (!set)
3951 		return ENOMEM;
3952 
3953 	svc->size += grow_size;
3954 	rss = set;
3955 	contexts = set + sizeof(*rss) * svc->size;
3956 	if (svc->cnt) {
3957 		memcpy(rss, svc->rss, sizeof(*rss) * (svc->cnt + 1));
3958 		memcpy(contexts, svc->contexts, svc->context_size * (svc->cnt + 1));
3959 	}
3960 
3961 	free(svc->rss);
3962 	svc->rss = rss;
3963 	svc->contexts = contexts;
3964 	return 0;
3965 }
3966 
3967 /*
3968  * Index 0 is reserved for the service's communication socket.
3969  */
3970 static int rs_svc_add_rs(struct rs_svc *svc, struct rsocket *rs)
3971 {
3972 	int ret;
3973 
3974 	if (svc->cnt >= svc->size - 1) {
3975 		ret = rs_svc_grow_sets(svc, 4);
3976 		if (ret)
3977 			return ret;
3978 	}
3979 
3980 	svc->rss[++svc->cnt] = rs;
3981 	return 0;
3982 }
3983 
3984 static int rs_svc_index(struct rs_svc *svc, struct rsocket *rs)
3985 {
3986 	int i;
3987 
3988 	for (i = 1; i <= svc->cnt; i++) {
3989 		if (svc->rss[i] == rs)
3990 			return i;
3991 	}
3992 	return -1;
3993 }
3994 
3995 static int rs_svc_rm_rs(struct rs_svc *svc, struct rsocket *rs)
3996 {
3997 	int i;
3998 
3999 	if ((i = rs_svc_index(svc, rs)) >= 0) {
4000 		svc->rss[i] = svc->rss[svc->cnt];
4001 		memcpy(svc->contexts + i * svc->context_size,
4002 		       svc->contexts + svc->cnt * svc->context_size,
4003 		       svc->context_size);
4004 		svc->cnt--;
4005 		return 0;
4006 	}
4007 	return EBADF;
4008 }
4009 
4010 static void udp_svc_process_sock(struct rs_svc *svc)
4011 {
4012 	struct rs_svc_msg msg;
4013 
4014 	read_all(svc->sock[1], &msg, sizeof msg);
4015 	switch (msg.cmd) {
4016 	case RS_SVC_ADD_DGRAM:
4017 		msg.status = rs_svc_add_rs(svc, msg.rs);
4018 		if (!msg.status) {
4019 			msg.rs->opts |= RS_OPT_SVC_ACTIVE;
4020 			udp_svc_fds = svc->contexts;
4021 			udp_svc_fds[svc->cnt].fd = msg.rs->udp_sock;
4022 			udp_svc_fds[svc->cnt].events = POLLIN;
4023 			udp_svc_fds[svc->cnt].revents = 0;
4024 		}
4025 		break;
4026 	case RS_SVC_REM_DGRAM:
4027 		msg.status = rs_svc_rm_rs(svc, msg.rs);
4028 		if (!msg.status)
4029 			msg.rs->opts &= ~RS_OPT_SVC_ACTIVE;
4030 		break;
4031 	case RS_SVC_NOOP:
4032 		msg.status = 0;
4033 		break;
4034 	default:
4035 		break;
4036 	}
4037 
4038 	write_all(svc->sock[1], &msg, sizeof msg);
4039 }
4040 
4041 static uint8_t udp_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid)
4042 {
4043 	union ibv_gid gid;
4044 	int i;
4045 
4046 	for (i = 0; i < 16; i++) {
4047 		ibv_query_gid(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num,
4048 			      i, &gid);
4049 		if (!memcmp(sgid, &gid, sizeof gid))
4050 			return i;
4051 	}
4052 	return 0;
4053 }
4054 
4055 static uint8_t udp_svc_path_bits(struct ds_dest *dest)
4056 {
4057 	struct ibv_port_attr attr;
4058 
4059 	if (!ibv_query_port(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num, &attr))
4060 		return (uint8_t) ((1 << attr.lmc) - 1);
4061 	return 0x7f;
4062 }
4063 
4064 static void udp_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn)
4065 {
4066 	union socket_addr saddr;
4067 	struct rdma_cm_id *id;
4068 	struct ibv_ah_attr attr;
4069 	int ret;
4070 
4071 	if (dest->ah) {
4072 		fastlock_acquire(&rs->slock);
4073 		ibv_destroy_ah(dest->ah);
4074 		dest->ah = NULL;
4075 		fastlock_release(&rs->slock);
4076 	}
4077 
4078 	ret = rdma_create_id(NULL, &id, NULL, dest->qp->cm_id->ps);
4079 	if  (ret)
4080 		return;
4081 
4082 	memcpy(&saddr, rdma_get_local_addr(dest->qp->cm_id),
4083 	       ucma_addrlen(rdma_get_local_addr(dest->qp->cm_id)));
4084 	if (saddr.sa.sa_family == AF_INET)
4085 		saddr.sin.sin_port = 0;
4086 	else
4087 		saddr.sin6.sin6_port = 0;
4088 	ret = rdma_resolve_addr(id, &saddr.sa, &dest->addr.sa, 2000);
4089 	if (ret)
4090 		goto out;
4091 
4092 	ret = rdma_resolve_route(id, 2000);
4093 	if (ret)
4094 		goto out;
4095 
4096 	memset(&attr, 0, sizeof attr);
4097 	if (id->route.path_rec->hop_limit > 1) {
4098 		attr.is_global = 1;
4099 		attr.grh.dgid = id->route.path_rec->dgid;
4100 		attr.grh.flow_label = be32toh(id->route.path_rec->flow_label);
4101 		attr.grh.sgid_index = udp_svc_sgid_index(dest, &id->route.path_rec->sgid);
4102 		attr.grh.hop_limit = id->route.path_rec->hop_limit;
4103 		attr.grh.traffic_class = id->route.path_rec->traffic_class;
4104 	}
4105 	attr.dlid = be16toh(id->route.path_rec->dlid);
4106 	attr.sl = id->route.path_rec->sl;
4107 	attr.src_path_bits = be16toh(id->route.path_rec->slid) & udp_svc_path_bits(dest);
4108 	attr.static_rate = id->route.path_rec->rate;
4109 	attr.port_num  = id->port_num;
4110 
4111 	fastlock_acquire(&rs->slock);
4112 	dest->qpn = qpn;
4113 	dest->ah = ibv_create_ah(dest->qp->cm_id->pd, &attr);
4114 	fastlock_release(&rs->slock);
4115 out:
4116 	rdma_destroy_id(id);
4117 }
4118 
4119 static int udp_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
4120 				 union socket_addr *addr)
4121 {
4122 	return (udp_hdr->tag == htobe32(DS_UDP_TAG)) &&
4123 		((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
4124 		  udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
4125 		 (udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 &&
4126 		  udp_hdr->length == DS_UDP_IPV6_HDR_LEN));
4127 }
4128 
4129 static void udp_svc_forward(struct rsocket *rs, void *buf, size_t len,
4130 			    union socket_addr *src)
4131 {
4132 	struct ds_header hdr;
4133 	struct ds_smsg *msg;
4134 	struct ibv_sge sge;
4135 	uint64_t offset;
4136 
4137 	if (!ds_can_send(rs)) {
4138 		if (ds_get_comp(rs, 0, ds_can_send))
4139 			return;
4140 	}
4141 
4142 	msg = rs->smsg_free;
4143 	rs->smsg_free = msg->next;
4144 	rs->sqe_avail--;
4145 
4146 	ds_format_hdr(&hdr, src);
4147 	memcpy((void *) msg, &hdr, hdr.length);
4148 	memcpy((void *) msg + hdr.length, buf, len);
4149 	sge.addr = (uintptr_t) msg;
4150 	sge.length = hdr.length + len;
4151 	sge.lkey = rs->conn_dest->qp->smr->lkey;
4152 	offset = (uint8_t *) msg - rs->sbuf;
4153 
4154 	ds_post_send(rs, &sge, offset);
4155 }
4156 
4157 static void udp_svc_process_rs(struct rsocket *rs)
4158 {
4159 	static uint8_t buf[RS_SNDLOWAT];
4160 	struct ds_dest *dest, *cur_dest;
4161 	struct ds_udp_header *udp_hdr;
4162 	union socket_addr addr;
4163 	socklen_t addrlen = sizeof addr;
4164 	int len, ret;
4165 	uint32_t qpn;
4166 
4167 	ret = recvfrom(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen);
4168 	if (ret < DS_UDP_IPV4_HDR_LEN)
4169 		return;
4170 
4171 	udp_hdr = (struct ds_udp_header *) buf;
4172 	if (!udp_svc_valid_udp_hdr(udp_hdr, &addr))
4173 		return;
4174 
4175 	len = ret - udp_hdr->length;
4176 	qpn = be32toh(udp_hdr->qpn) & 0xFFFFFF;
4177 
4178 	udp_hdr->tag = (__force __be32)be32toh(udp_hdr->tag);
4179 	udp_hdr->qpn = (__force __be32)qpn;
4180 
4181 	ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
4182 	if (ret)
4183 		return;
4184 
4185 	if (udp_hdr->op == RS_OP_DATA) {
4186 		fastlock_acquire(&rs->slock);
4187 		cur_dest = rs->conn_dest;
4188 		rs->conn_dest = dest;
4189 		ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
4190 		rs->conn_dest = cur_dest;
4191 		fastlock_release(&rs->slock);
4192 	}
4193 
4194 	if (!dest->ah || (dest->qpn != qpn))
4195 		udp_svc_create_ah(rs, dest, qpn);
4196 
4197 	/* to do: handle when dest local ip address doesn't match udp ip */
4198 	if (udp_hdr->op == RS_OP_DATA) {
4199 		fastlock_acquire(&rs->slock);
4200 		cur_dest = rs->conn_dest;
4201 		rs->conn_dest = &dest->qp->dest;
4202 		udp_svc_forward(rs, buf + udp_hdr->length, len, &addr);
4203 		rs->conn_dest = cur_dest;
4204 		fastlock_release(&rs->slock);
4205 	}
4206 }
4207 
4208 static void *udp_svc_run(void *arg)
4209 {
4210 	struct rs_svc *svc = arg;
4211 	struct rs_svc_msg msg;
4212 	int i, ret;
4213 
4214 	ret = rs_svc_grow_sets(svc, 4);
4215 	if (ret) {
4216 		msg.status = ret;
4217 		write_all(svc->sock[1], &msg, sizeof msg);
4218 		return (void *) (uintptr_t) ret;
4219 	}
4220 
4221 	udp_svc_fds = svc->contexts;
4222 	udp_svc_fds[0].fd = svc->sock[1];
4223 	udp_svc_fds[0].events = POLLIN;
4224 	do {
4225 		for (i = 0; i <= svc->cnt; i++)
4226 			udp_svc_fds[i].revents = 0;
4227 
4228 		poll(udp_svc_fds, svc->cnt + 1, -1);
4229 		if (udp_svc_fds[0].revents)
4230 			udp_svc_process_sock(svc);
4231 
4232 		for (i = 1; i <= svc->cnt; i++) {
4233 			if (udp_svc_fds[i].revents)
4234 				udp_svc_process_rs(svc->rss[i]);
4235 		}
4236 	} while (svc->cnt >= 1);
4237 
4238 	return NULL;
4239 }
4240 
4241 static uint32_t rs_get_time(void)
4242 {
4243 	struct timeval now;
4244 
4245 	memset(&now, 0, sizeof now);
4246 	gettimeofday(&now, NULL);
4247 	return (uint32_t) now.tv_sec;
4248 }
4249 
4250 static void tcp_svc_process_sock(struct rs_svc *svc)
4251 {
4252 	struct rs_svc_msg msg;
4253 	int i;
4254 
4255 	read_all(svc->sock[1], &msg, sizeof msg);
4256 	switch (msg.cmd) {
4257 	case RS_SVC_ADD_KEEPALIVE:
4258 		msg.status = rs_svc_add_rs(svc, msg.rs);
4259 		if (!msg.status) {
4260 			msg.rs->opts |= RS_OPT_SVC_ACTIVE;
4261 			tcp_svc_timeouts = svc->contexts;
4262 			tcp_svc_timeouts[svc->cnt] = rs_get_time() +
4263 						     msg.rs->keepalive_time;
4264 		}
4265 		break;
4266 	case RS_SVC_REM_KEEPALIVE:
4267 		msg.status = rs_svc_rm_rs(svc, msg.rs);
4268 		if (!msg.status)
4269 			msg.rs->opts &= ~RS_OPT_SVC_ACTIVE;
4270 		break;
4271 	case RS_SVC_MOD_KEEPALIVE:
4272 		i = rs_svc_index(svc, msg.rs);
4273 		if (i >= 0) {
4274 			tcp_svc_timeouts[i] = rs_get_time() + msg.rs->keepalive_time;
4275 			msg.status = 0;
4276 		} else {
4277 			msg.status = EBADF;
4278 		}
4279 		break;
4280 	case RS_SVC_NOOP:
4281 		msg.status = 0;
4282 		break;
4283 	default:
4284 		break;
4285 	}
4286 	write_all(svc->sock[1], &msg, sizeof msg);
4287 }
4288 
4289 /*
4290  * Send a 0 byte RDMA write with immediate as keep-alive message.
4291  * This avoids the need for the receive side to do any acknowledgment.
4292  */
4293 static void tcp_svc_send_keepalive(struct rsocket *rs)
4294 {
4295 	fastlock_acquire(&rs->cq_lock);
4296 	if (rs_ctrl_avail(rs) && (rs->state & rs_connected)) {
4297 		rs->ctrl_seqno++;
4298 		rs_post_write(rs, NULL, 0, rs_msg_set(RS_OP_CTRL, RS_CTRL_KEEPALIVE),
4299 			      0, (uintptr_t) NULL, (uintptr_t) NULL);
4300 	}
4301 	fastlock_release(&rs->cq_lock);
4302 }
4303 
4304 static void *tcp_svc_run(void *arg)
4305 {
4306 	struct rs_svc *svc = arg;
4307 	struct rs_svc_msg msg;
4308 	struct pollfd fds;
4309 	uint32_t now, next_timeout;
4310 	int i, ret, timeout;
4311 
4312 	ret = rs_svc_grow_sets(svc, 16);
4313 	if (ret) {
4314 		msg.status = ret;
4315 		write_all(svc->sock[1], &msg, sizeof msg);
4316 		return (void *) (uintptr_t) ret;
4317 	}
4318 
4319 	tcp_svc_timeouts = svc->contexts;
4320 	fds.fd = svc->sock[1];
4321 	fds.events = POLLIN;
4322 	timeout = -1;
4323 	do {
4324 		poll(&fds, 1, timeout * 1000);
4325 		if (fds.revents)
4326 			tcp_svc_process_sock(svc);
4327 
4328 		now = rs_get_time();
4329 		next_timeout = ~0;
4330 		for (i = 1; i <= svc->cnt; i++) {
4331 			if (tcp_svc_timeouts[i] <= now) {
4332 				tcp_svc_send_keepalive(svc->rss[i]);
4333 				tcp_svc_timeouts[i] =
4334 					now + svc->rss[i]->keepalive_time;
4335 			}
4336 			if (tcp_svc_timeouts[i] < next_timeout)
4337 				next_timeout = tcp_svc_timeouts[i];
4338 		}
4339 		timeout = (int) (next_timeout - now);
4340 	} while (svc->cnt >= 1);
4341 
4342 	return NULL;
4343 }
4344