1 /*
2  * Copyright (c) 2009-2012 Red Hat, Inc.
3  *
4  * All rights reserved.
5  *
6  * Author: Steven Dake (sdake@redhat.com)
7 
8  * This software licensed under BSD license, the text of which follows:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions are met:
12  *
13  * - Redistributions of source code must retain the above copyright notice,
14  *   this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright notice,
16  *   this list of conditions and the following disclaimer in the documentation
17  *   and/or other materials provided with the distribution.
18  * - Neither the name of the MontaVista Software, Inc. nor the names of its
19  *   contributors may be used to endorse or promote products derived from this
20  *   software without specific prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32  * THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include <config.h>
36 
37 #include <assert.h>
38 #include <pthread.h>
39 #include <sys/mman.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <sys/socket.h>
43 #include <netdb.h>
44 #include <sys/un.h>
45 #include <sys/ioctl.h>
46 #include <sys/param.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
49 #include <unistd.h>
50 #include <fcntl.h>
51 #include <stdlib.h>
52 #include <stdio.h>
53 #include <errno.h>
54 #include <sched.h>
55 #include <time.h>
56 #include <sys/time.h>
57 #include <sys/poll.h>
58 #include <limits.h>
59 #include <stdio.h>
60 #include <string.h>
61 #include <stdlib.h>
62 #include <sys/types.h>
63 #include <sys/socket.h>
64 #include <netdb.h>
65 #include <rdma/rdma_cma.h>
66 #include <assert.h>
67 #include <errno.h>
68 
69 #include <corosync/sq.h>
70 #include <corosync/list.h>
71 #include <corosync/hdb.h>
72 #include <corosync/swab.h>
73 
74 #include <qb/qbdefs.h>
75 #include <qb/qbloop.h>
76 #define LOGSYS_UTILS_ONLY 1
77 #include <corosync/logsys.h>
78 #include "totemiba.h"
79 
80 #define COMPLETION_QUEUE_ENTRIES 100
81 
82 #define TOTAL_READ_POSTS 100
83 
84 #define MAX_MTU_SIZE 4096
85 
86 #define MCAST_REJOIN_MSEC	100
87 
88 struct totemiba_instance {
89 	struct sockaddr bind_addr;
90 
91 	struct sockaddr send_token_bind_addr;
92 
93 	struct sockaddr mcast_addr;
94 
95 	struct sockaddr token_addr;
96 
97 	struct sockaddr local_mcast_bind_addr;
98 
99 	struct totem_interface *totem_interface;
100 
101 	struct totem_config *totem_config;
102 
103 	totemsrp_stats_t *stats;
104 
105 	void (*totemiba_iface_change_fn) (
106 		void *context,
107 		const struct totem_ip_address *iface_address);
108 
109 	void (*totemiba_deliver_fn) (
110 		void *context,
111 		const void *msg,
112 		unsigned int msg_len);
113 
114 	void (*totemiba_target_set_completed) (
115 		void *context);
116 
117 	void *rrp_context;
118 
119 	qb_loop_timer_handle timer_netif_check_timeout;
120 
121 	qb_loop_t *totemiba_poll_handle;
122 
123 	struct totem_ip_address my_id;
124 
125 	struct rdma_event_channel *mcast_channel;
126 
127 	struct rdma_cm_id *mcast_cma_id;
128 
129 	struct ibv_pd *mcast_pd;
130 
131 	struct sockaddr mcast_dest_addr;
132 
133 	uint32_t mcast_qpn;
134 
135 	uint32_t mcast_qkey;
136 
137 	struct ibv_ah *mcast_ah;
138 
139 	struct ibv_comp_channel *mcast_send_completion_channel;
140 
141 	struct ibv_comp_channel *mcast_recv_completion_channel;
142 
143 	struct ibv_cq *mcast_send_cq;
144 
145 	struct ibv_cq *mcast_recv_cq;
146 
147 	int recv_token_accepted;
148 
149 	struct rdma_event_channel *recv_token_channel;
150 
151 	struct rdma_event_channel *listen_recv_token_channel;
152 
153 	struct rdma_cm_id *listen_recv_token_cma_id;
154 
155 	struct rdma_cm_id *recv_token_cma_id;
156 
157 	struct ibv_pd *recv_token_pd;
158 
159 	struct sockaddr recv_token_dest_addr;
160 
161 	struct ibv_comp_channel *recv_token_send_completion_channel;
162 
163 	struct ibv_comp_channel *recv_token_recv_completion_channel;
164 
165 	struct ibv_cq *recv_token_send_cq;
166 
167 	struct ibv_cq *recv_token_recv_cq;
168 
169 	int send_token_bound;
170 
171 	struct rdma_event_channel *send_token_channel;
172 
173 	struct rdma_cm_id *send_token_cma_id;
174 
175 	struct ibv_pd *send_token_pd;
176 
177 	struct sockaddr send_token_dest_addr;
178 
179 	uint32_t send_token_qpn;
180 
181 	uint32_t send_token_qkey;
182 
183 	struct ibv_ah *send_token_ah;
184 
185 	struct ibv_comp_channel *send_token_send_completion_channel;
186 
187 	struct ibv_comp_channel *send_token_recv_completion_channel;
188 
189 	struct ibv_cq *send_token_send_cq;
190 
191 	struct ibv_cq *send_token_recv_cq;
192 
193         void (*totemiba_log_printf) (
194 		int level,
195 		int subsys,
196 		const char *function,
197 		const char *file,
198 		int line,
199 		const char *format,
200 		...)__attribute__((format(printf, 6, 7)));
201 
202 
203 	int totemiba_subsys_id;
204 
205 	struct list_head mcast_send_buf_free;
206 
207 	struct list_head token_send_buf_free;
208 
209 	struct list_head mcast_send_buf_head;
210 
211 	struct list_head token_send_buf_head;
212 
213 	struct list_head recv_token_recv_buf_head;
214 
215 	int mcast_seen_joined;
216 
217 	qb_loop_timer_handle mcast_rejoin;
218 };
219 union u {
220 	uint64_t wr_id;
221 	void *v;
222 };
223 
224 #define log_printf(level, format, args...)			\
225 do {								\
226         instance->totemiba_log_printf (				\
227 			level,					\
228 			instance->totemiba_subsys_id,		\
229 			__FUNCTION__, __FILE__, __LINE__,	\
230 			(const char *)format, ##args);		\
231 } while (0);
232 
233 struct recv_buf {
234 	struct list_head list_all;
235 	struct ibv_recv_wr recv_wr;
236 	struct ibv_sge sge;
237 	struct ibv_mr *mr;
238 	char buffer[MAX_MTU_SIZE + sizeof (struct ibv_grh)];
239 };
240 
241 struct send_buf {
242 	struct list_head list_free;
243 	struct list_head list_all;
244 	struct ibv_mr *mr;
245 	char buffer[MAX_MTU_SIZE];
246 };
247 
248 static hdb_handle_t
void2wrid(void * v)249 void2wrid (void *v) { union u u; u.v = v; return u.wr_id; }
250 
251 static void *
wrid2void(uint64_t wr_id)252 wrid2void (uint64_t wr_id) { union u u; u.wr_id = wr_id; return u.v; }
253 
totemiba_instance_initialize(struct totemiba_instance * instance)254 static void totemiba_instance_initialize (struct totemiba_instance *instance)
255 {
256 	memset (instance, 0, sizeof (struct totemiba_instance));
257 	list_init (&instance->mcast_send_buf_free);
258 	list_init (&instance->token_send_buf_free);
259 	list_init (&instance->mcast_send_buf_head);
260 	list_init (&instance->token_send_buf_head);
261 	list_init (&instance->recv_token_recv_buf_head);
262 }
263 
mcast_send_buf_get(struct totemiba_instance * instance)264 static inline struct send_buf *mcast_send_buf_get (
265 	struct totemiba_instance *instance)
266 {
267 	struct send_buf *send_buf;
268 
269 	if (list_empty (&instance->mcast_send_buf_free) == 0) {
270 		send_buf = list_entry (instance->mcast_send_buf_free.next, struct send_buf, list_free);
271 		list_del (&send_buf->list_free);
272 		return (send_buf);
273 	}
274 
275 	send_buf = malloc (sizeof (struct send_buf));
276 	if (send_buf == NULL) {
277 		return (NULL);
278 	}
279 	send_buf->mr = ibv_reg_mr (instance->mcast_pd,
280 		send_buf->buffer,
281 		MAX_MTU_SIZE, IBV_ACCESS_LOCAL_WRITE);
282 	if (send_buf->mr == NULL) {
283 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't register memory range");
284 		free (send_buf);
285 		return (NULL);
286 	}
287 	list_init (&send_buf->list_all);
288 	list_add_tail (&send_buf->list_all, &instance->mcast_send_buf_head);
289 
290 	return (send_buf);
291 }
292 
mcast_send_buf_put(struct totemiba_instance * instance,struct send_buf * send_buf)293 static inline void mcast_send_buf_put (
294 	struct totemiba_instance *instance,
295 	struct send_buf *send_buf)
296 {
297 	list_init (&send_buf->list_free);
298 	list_add_tail (&send_buf->list_free, &instance->mcast_send_buf_free);
299 }
300 
token_send_buf_get(struct totemiba_instance * instance)301 static inline struct send_buf *token_send_buf_get (
302 	struct totemiba_instance *instance)
303 {
304 	struct send_buf *send_buf;
305 
306 	if (list_empty (&instance->token_send_buf_free) == 0) {
307 		send_buf = list_entry (instance->token_send_buf_free.next, struct send_buf, list_free);
308 		list_del (&send_buf->list_free);
309 		return (send_buf);
310 	}
311 
312 	send_buf = malloc (sizeof (struct send_buf));
313 	if (send_buf == NULL) {
314 		return (NULL);
315 	}
316 	send_buf->mr = ibv_reg_mr (instance->send_token_pd,
317 		send_buf->buffer,
318 		MAX_MTU_SIZE, IBV_ACCESS_LOCAL_WRITE);
319 	if (send_buf->mr == NULL) {
320 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't register memory range");
321 		free (send_buf);
322 		return (NULL);
323 	}
324 	list_init (&send_buf->list_all);
325 	list_add_tail (&send_buf->list_all, &instance->token_send_buf_head);
326 
327 	return (send_buf);
328 }
329 
token_send_buf_destroy(struct totemiba_instance * instance)330 static inline void token_send_buf_destroy (struct totemiba_instance *instance)
331 {
332 	struct list_head *list;
333 	struct send_buf *send_buf;
334 
335         for (list = instance->token_send_buf_head.next; list != &instance->token_send_buf_head;) {
336                 send_buf = list_entry (list, struct send_buf, list_all);
337 		list = list->next;
338 		ibv_dereg_mr (send_buf->mr);
339 		free (send_buf);
340 	}
341 
342 	list_init (&instance->token_send_buf_free);
343 	list_init (&instance->token_send_buf_head);
344 }
345 
token_send_buf_put(struct totemiba_instance * instance,struct send_buf * send_buf)346 static inline void token_send_buf_put (
347 	struct totemiba_instance *instance,
348 	struct send_buf *send_buf)
349 {
350 	list_init (&send_buf->list_free);
351 	list_add_tail (&send_buf->list_free, &instance->token_send_buf_free);
352 }
353 
recv_token_recv_buf_create(struct totemiba_instance * instance)354 static inline struct recv_buf *recv_token_recv_buf_create (
355 	struct totemiba_instance *instance)
356 {
357 	struct recv_buf *recv_buf;
358 
359 	recv_buf = malloc (sizeof (struct recv_buf));
360 	if (recv_buf == NULL) {
361 		return (NULL);
362 	}
363 
364 	recv_buf->mr = ibv_reg_mr (instance->recv_token_pd, &recv_buf->buffer,
365 		MAX_MTU_SIZE + sizeof (struct ibv_grh),
366 		IBV_ACCESS_LOCAL_WRITE);
367 
368 	recv_buf->recv_wr.next = NULL;
369 	recv_buf->recv_wr.sg_list = &recv_buf->sge;
370 	recv_buf->recv_wr.num_sge = 1;
371 	recv_buf->recv_wr.wr_id = (uintptr_t)recv_buf;
372 
373 	recv_buf->sge.length = MAX_MTU_SIZE + sizeof (struct ibv_grh);
374 	recv_buf->sge.lkey = recv_buf->mr->lkey;
375 	recv_buf->sge.addr = (uintptr_t)recv_buf->buffer;
376 
377 	list_init (&recv_buf->list_all);
378 	list_add (&recv_buf->list_all, &instance->recv_token_recv_buf_head);
379 	return (recv_buf);
380 }
381 
recv_token_recv_buf_post(struct totemiba_instance * instance,struct recv_buf * recv_buf)382 static inline int recv_token_recv_buf_post (struct totemiba_instance *instance, struct recv_buf *recv_buf)
383 {
384 	struct ibv_recv_wr *fail_recv;
385 	int res;
386 
387 	res = ibv_post_recv (instance->recv_token_cma_id->qp, &recv_buf->recv_wr, &fail_recv);
388 
389 	return (res);
390 }
391 
recv_token_recv_buf_post_initial(struct totemiba_instance * instance)392 static inline void recv_token_recv_buf_post_initial (struct totemiba_instance *instance)
393 {
394 	struct recv_buf *recv_buf;
395 	unsigned int i;
396 
397 	for (i = 0; i < TOTAL_READ_POSTS; i++) {
398 		recv_buf = recv_token_recv_buf_create (instance);
399 
400 		recv_token_recv_buf_post (instance, recv_buf);
401 	}
402 }
403 
recv_token_recv_buf_post_destroy(struct totemiba_instance * instance)404 static inline void recv_token_recv_buf_post_destroy (
405 	struct totemiba_instance *instance)
406 {
407 	struct recv_buf *recv_buf;
408 	struct list_head *list;
409 
410 	for (list = instance->recv_token_recv_buf_head.next;
411 		list != &instance->recv_token_recv_buf_head;) {
412 
413 		recv_buf = list_entry (list, struct recv_buf, list_all);
414 		list = list->next;
415 		ibv_dereg_mr (recv_buf->mr);
416 		free (recv_buf);
417 	}
418 	list_init (&instance->recv_token_recv_buf_head);
419 }
420 
mcast_recv_buf_create(struct totemiba_instance * instance)421 static inline struct recv_buf *mcast_recv_buf_create (struct totemiba_instance *instance)
422 {
423 	struct recv_buf *recv_buf;
424 	struct ibv_mr *mr;
425 
426 	recv_buf = malloc (sizeof (struct recv_buf));
427 	if (recv_buf == NULL) {
428 		return (NULL);
429 	}
430 
431 	mr = ibv_reg_mr (instance->mcast_pd, &recv_buf->buffer,
432 		MAX_MTU_SIZE + sizeof (struct ibv_grh),
433 		IBV_ACCESS_LOCAL_WRITE);
434 
435 	recv_buf->recv_wr.next = NULL;
436 	recv_buf->recv_wr.sg_list = &recv_buf->sge;
437 	recv_buf->recv_wr.num_sge = 1;
438 	recv_buf->recv_wr.wr_id = (uintptr_t)recv_buf;
439 
440 	recv_buf->sge.length = MAX_MTU_SIZE + sizeof (struct ibv_grh);
441 	recv_buf->sge.lkey = mr->lkey;
442 	recv_buf->sge.addr = (uintptr_t)recv_buf->buffer;
443 
444 	return (recv_buf);
445 }
446 
mcast_recv_buf_post(struct totemiba_instance * instance,struct recv_buf * recv_buf)447 static inline int mcast_recv_buf_post (struct totemiba_instance *instance, struct recv_buf *recv_buf)
448 {
449 	struct ibv_recv_wr *fail_recv;
450 	int res;
451 
452 	res = ibv_post_recv (instance->mcast_cma_id->qp, &recv_buf->recv_wr, &fail_recv);
453 
454 	return (res);
455 }
456 
mcast_recv_buf_post_initial(struct totemiba_instance * instance)457 static inline void mcast_recv_buf_post_initial (struct totemiba_instance *instance)
458 {
459 	struct recv_buf *recv_buf;
460 	unsigned int i;
461 
462 	for (i = 0; i < TOTAL_READ_POSTS; i++) {
463 		recv_buf = mcast_recv_buf_create (instance);
464 
465 		mcast_recv_buf_post (instance, recv_buf);
466 	}
467 }
468 
iba_deliver_fn(struct totemiba_instance * instance,uint64_t wr_id,uint32_t bytes)469 static inline void iba_deliver_fn (struct totemiba_instance *instance, uint64_t wr_id, uint32_t bytes)
470 {
471 	const char *addr;
472 	const struct recv_buf *recv_buf;
473 
474 	recv_buf = wrid2void(wr_id);
475 	addr = &recv_buf->buffer[sizeof (struct ibv_grh)];
476 
477 	bytes -= sizeof (struct ibv_grh);
478 	instance->totemiba_deliver_fn (instance->rrp_context, addr, bytes);
479 }
480 
mcast_cq_send_event_fn(int fd,int events,void * context)481 static int mcast_cq_send_event_fn (int fd, int events, void *context)
482 {
483 	struct totemiba_instance *instance = (struct totemiba_instance *)context;
484 	struct ibv_wc wc[32];
485 	struct ibv_cq *ev_cq;
486 	void *ev_ctx;
487 	int res;
488 	int i;
489 
490 	ibv_get_cq_event (instance->mcast_send_completion_channel, &ev_cq, &ev_ctx);
491 	ibv_ack_cq_events (ev_cq, 1);
492 	res = ibv_req_notify_cq (ev_cq, 0);
493 
494 	res = ibv_poll_cq (instance->mcast_send_cq, 32, wc);
495 	if (res > 0) {
496 		for (i = 0; i < res; i++) {
497 			mcast_send_buf_put (instance, wrid2void(wc[i].wr_id));
498 		}
499 	}
500 
501 	return (0);
502 }
503 
mcast_cq_recv_event_fn(int fd,int events,void * context)504 static int mcast_cq_recv_event_fn (int fd, int events, void *context)
505 {
506 	struct totemiba_instance *instance = (struct totemiba_instance *)context;
507 	struct ibv_wc wc[64];
508 	struct ibv_cq *ev_cq;
509 	void *ev_ctx;
510 	int res;
511 	int i;
512 
513 	ibv_get_cq_event (instance->mcast_recv_completion_channel, &ev_cq, &ev_ctx);
514 	ibv_ack_cq_events (ev_cq, 1);
515 	res = ibv_req_notify_cq (ev_cq, 0);
516 
517 	res = ibv_poll_cq (instance->mcast_recv_cq, 64, wc);
518 	if (res > 0) {
519 		for (i = 0; i < res; i++) {
520 			iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
521 			mcast_recv_buf_post (instance, wrid2void(wc[i].wr_id));
522 		}
523 	}
524 
525 	return (0);
526 }
527 
mcast_rejoin(void * data)528 static void mcast_rejoin (void *data)
529 {
530 	int res;
531 	struct totemiba_instance *instance = (struct totemiba_instance *)data;
532 
533 	res = rdma_leave_multicast (instance->mcast_cma_id, &instance->mcast_addr);
534 	if (instance->mcast_ah) {
535 		ibv_destroy_ah (instance->mcast_ah);
536 		instance->mcast_ah = 0;
537 	}
538 
539 	res = rdma_join_multicast (instance->mcast_cma_id, &instance->mcast_addr, instance);
540 	if (res != 0) {
541 		log_printf (LOGSYS_LEVEL_DEBUG,
542 		    "rdma_join_multicast failed, errno=%d, rejoining in %u ms",
543 		    errno,
544 		    MCAST_REJOIN_MSEC);
545 		qb_loop_timer_add (instance->totemiba_poll_handle,
546 			QB_LOOP_MED,
547 			MCAST_REJOIN_MSEC * QB_TIME_NS_IN_MSEC,
548 			(void *)instance,
549 			mcast_rejoin,
550 			&instance->mcast_rejoin);
551 	}
552 }
553 
mcast_rdma_event_fn(int fd,int events,void * context)554 static int mcast_rdma_event_fn (int fd, int events, void *context)
555 {
556 	struct totemiba_instance *instance = (struct totemiba_instance *)context;
557 	struct rdma_cm_event *event;
558 
559 	int res;
560 
561 	res = rdma_get_cm_event (instance->mcast_channel, &event);
562 	if (res != 0) {
563 		return (0);
564 	}
565 
566 	switch (event->event) {
567 	/*
568 	 * occurs when we resolve the multicast address
569 	 */
570 	case RDMA_CM_EVENT_ADDR_RESOLVED:
571 		res = rdma_join_multicast (instance->mcast_cma_id, &instance->mcast_addr, instance);
572 		usleep(1000);
573 		if (res == 0) break;
574 	case RDMA_CM_EVENT_MULTICAST_ERROR:
575 		log_printf (LOGSYS_LEVEL_ERROR, "multicast error, trying to rejoin in %u ms", MCAST_REJOIN_MSEC);
576 		qb_loop_timer_add (instance->totemiba_poll_handle,
577 			QB_LOOP_MED,
578 			MCAST_REJOIN_MSEC * QB_TIME_NS_IN_MSEC,
579 			(void *)instance,
580 			mcast_rejoin,
581 			&instance->mcast_rejoin);
582 		break;
583 	/*
584 	 * occurs when the CM joins the multicast group
585 	 */
586 	case RDMA_CM_EVENT_MULTICAST_JOIN:
587 		instance->mcast_qpn = event->param.ud.qp_num;
588 		instance->mcast_qkey = event->param.ud.qkey;
589 		instance->mcast_ah = ibv_create_ah (instance->mcast_pd, &event->param.ud.ah_attr);
590 
591 		if (instance->mcast_seen_joined == 0) {
592 			log_printf (LOGSYS_LEVEL_DEBUG, "joining mcast 1st time, running callbacks");
593 			instance->totemiba_iface_change_fn (instance->rrp_context, &instance->my_id);
594 			instance->mcast_seen_joined=1;
595 		}
596 		log_printf (LOGSYS_LEVEL_NOTICE, "Joined multicast!");
597 		break;
598 	case RDMA_CM_EVENT_ADDR_ERROR:
599 	case RDMA_CM_EVENT_ROUTE_ERROR:
600 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
601 		break;
602 	default:
603 		log_printf (LOGSYS_LEVEL_ERROR, "default %d", event->event);
604 		break;
605 	}
606 
607 	rdma_ack_cm_event (event);
608 	return (0);
609 }
610 
recv_token_cq_send_event_fn(int fd,int revents,void * context)611 static int recv_token_cq_send_event_fn (
612 	int fd,
613 	int revents,
614 	void *context)
615 {
616 	struct totemiba_instance *instance = (struct totemiba_instance *)context;
617 	struct ibv_wc wc[32];
618 	struct ibv_cq *ev_cq;
619 	void *ev_ctx;
620 	int res;
621 	int i;
622 
623 	ibv_get_cq_event (instance->recv_token_send_completion_channel, &ev_cq, &ev_ctx);
624 	ibv_ack_cq_events (ev_cq, 1);
625 	res = ibv_req_notify_cq (ev_cq, 0);
626 
627 	res = ibv_poll_cq (instance->recv_token_send_cq, 32, wc);
628 	if (res > 0) {
629 		for (i = 0; i < res; i++) {
630 			iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
631 			ibv_dereg_mr (wrid2void(wc[i].wr_id));
632 		}
633 	}
634 
635 	return (0);
636 }
637 
recv_token_cq_recv_event_fn(int fd,int events,void * context)638 static int recv_token_cq_recv_event_fn (int fd, int events, void *context)
639 {
640 	struct totemiba_instance *instance = (struct totemiba_instance *)context;
641 	struct ibv_wc wc[32];
642 	struct ibv_cq *ev_cq;
643 	void *ev_ctx;
644 	int res;
645 	int i;
646 
647 	ibv_get_cq_event (instance->recv_token_recv_completion_channel, &ev_cq, &ev_ctx);
648 	ibv_ack_cq_events (ev_cq, 1);
649 	res = ibv_req_notify_cq (ev_cq, 0);
650 
651 	res = ibv_poll_cq (instance->recv_token_recv_cq, 32, wc);
652 	if (res > 0) {
653 		for (i = 0; i < res; i++) {
654 			iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
655 			recv_token_recv_buf_post (instance, wrid2void(wc[i].wr_id));
656 		}
657 	}
658 
659 	return (0);
660 }
661 
recv_token_accept_destroy(struct totemiba_instance * instance)662 static int recv_token_accept_destroy (struct totemiba_instance *instance)
663 {
664 	if (instance->recv_token_accepted == 0) {
665 		return (0);
666 	}
667 
668 	qb_loop_poll_del (
669 		instance->totemiba_poll_handle,
670 		instance->recv_token_recv_completion_channel->fd);
671 
672 	qb_loop_poll_del (
673 		instance->totemiba_poll_handle,
674 		instance->recv_token_send_completion_channel->fd);
675 
676 	rdma_destroy_qp (instance->recv_token_cma_id);
677 
678 	recv_token_recv_buf_post_destroy (instance);
679 
680 	ibv_destroy_cq (instance->recv_token_send_cq);
681 
682 	ibv_destroy_cq (instance->recv_token_recv_cq);
683 
684 	ibv_destroy_comp_channel (instance->recv_token_send_completion_channel);
685 
686 	ibv_destroy_comp_channel (instance->recv_token_recv_completion_channel);
687 
688 	ibv_dealloc_pd (instance->recv_token_pd);
689 
690 	rdma_destroy_id (instance->recv_token_cma_id);
691 
692 	return (0);
693 }
694 
recv_token_accept_setup(struct totemiba_instance * instance)695 static int recv_token_accept_setup (struct totemiba_instance *instance)
696 {
697 	struct ibv_qp_init_attr init_qp_attr;
698 	int res = 0;
699 
700 	/*
701 	 * Allocate the protection domain
702 	 */
703 	instance->recv_token_pd = ibv_alloc_pd (instance->recv_token_cma_id->verbs);
704 
705 	/*
706 	 * Create a completion channel
707 	 */
708 	instance->recv_token_recv_completion_channel = ibv_create_comp_channel (instance->recv_token_cma_id->verbs);
709 	if (instance->recv_token_recv_completion_channel == NULL) {
710 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion channel");
711 		return (-1);
712 	}
713 
714 	/*
715 	 * Create the completion queue
716 	 */
717 	instance->recv_token_recv_cq = ibv_create_cq (instance->recv_token_cma_id->verbs,
718 		COMPLETION_QUEUE_ENTRIES, instance,
719 		instance->recv_token_recv_completion_channel, 0);
720 	if (instance->recv_token_recv_cq == NULL) {
721 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion queue");
722 		return (-1);
723 	}
724 	res = ibv_req_notify_cq (instance->recv_token_recv_cq, 0);
725 	if (res != 0) {
726 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't request notifications of the completion queue");
727 		return (-1);
728 	}
729 
730 	/*
731 	 * Create a completion channel
732 	 */
733 	instance->recv_token_send_completion_channel = ibv_create_comp_channel (instance->recv_token_cma_id->verbs);
734 	if (instance->recv_token_send_completion_channel == NULL) {
735 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion channel");
736 		return (-1);
737 	}
738 
739 	/*
740 	 * Create the completion queue
741 	 */
742 	instance->recv_token_send_cq = ibv_create_cq (instance->recv_token_cma_id->verbs,
743 		COMPLETION_QUEUE_ENTRIES, instance,
744 		instance->recv_token_send_completion_channel, 0);
745 	if (instance->recv_token_send_cq == NULL) {
746 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion queue");
747 		return (-1);
748 	}
749 	res = ibv_req_notify_cq (instance->recv_token_send_cq, 0);
750 	if (res != 0) {
751 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't request notifications of the completion queue");
752 		return (-1);
753 	}
754 	memset (&init_qp_attr, 0, sizeof (struct ibv_qp_init_attr));
755 	init_qp_attr.cap.max_send_wr = 50;
756 	init_qp_attr.cap.max_recv_wr = TOTAL_READ_POSTS;
757 	init_qp_attr.cap.max_send_sge = 1;
758 	init_qp_attr.cap.max_recv_sge = 1;
759 	init_qp_attr.qp_context = instance;
760 	init_qp_attr.sq_sig_all = 0;
761 	init_qp_attr.qp_type = IBV_QPT_UD;
762 	init_qp_attr.send_cq = instance->recv_token_send_cq;
763 	init_qp_attr.recv_cq = instance->recv_token_recv_cq;
764 	res = rdma_create_qp (instance->recv_token_cma_id, instance->recv_token_pd,
765 		&init_qp_attr);
766 	if (res != 0) {
767 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create queue pair");
768 		return (-1);
769 	}
770 
771 	recv_token_recv_buf_post_initial (instance);
772 
773 	qb_loop_poll_add (
774 		instance->totemiba_poll_handle,
775 		QB_LOOP_MED,
776 		instance->recv_token_recv_completion_channel->fd,
777 		POLLIN, instance, recv_token_cq_recv_event_fn);
778 
779 	qb_loop_poll_add (
780 		instance->totemiba_poll_handle,
781 		QB_LOOP_MED,
782 		instance->recv_token_send_completion_channel->fd,
783 		POLLIN, instance, recv_token_cq_send_event_fn);
784 
785 	instance->recv_token_accepted = 1;
786 
787 	return (res);
788 };
789 
recv_token_rdma_event_fn(int fd,int events,void * context)790 static int recv_token_rdma_event_fn (int fd, int events, void *context)
791 {
792 	struct totemiba_instance *instance = (struct totemiba_instance *)context;
793 	struct rdma_cm_event *event;
794 	struct rdma_conn_param conn_param;
795 
796 	int res;
797 
798 	res = rdma_get_cm_event (instance->listen_recv_token_channel, &event);
799 	if (res != 0) {
800 		return (0);
801 	}
802 
803 	switch (event->event) {
804 	case RDMA_CM_EVENT_CONNECT_REQUEST:
805 		recv_token_accept_destroy (instance);
806 
807 		instance->recv_token_cma_id = event->id;
808 		recv_token_accept_setup (instance);
809 		memset (&conn_param, 0, sizeof (struct rdma_conn_param));
810 		conn_param.qp_num = instance->recv_token_cma_id->qp->qp_num;
811 		res = rdma_accept (instance->recv_token_cma_id, &conn_param);
812 		break;
813 	default:
814 		log_printf (LOGSYS_LEVEL_ERROR, "default %d", event->event);
815 		break;
816 	}
817 
818 	res = rdma_ack_cm_event (event);
819 	return (0);
820 }
821 
send_token_cq_send_event_fn(int fd,int events,void * context)822 static int send_token_cq_send_event_fn (int fd, int events, void *context)
823 {
824 	struct totemiba_instance *instance = (struct totemiba_instance *)context;
825 	struct ibv_wc wc[32];
826 	struct ibv_cq *ev_cq;
827 	void *ev_ctx;
828 	int res;
829 	int i;
830 
831 	ibv_get_cq_event (instance->send_token_send_completion_channel, &ev_cq, &ev_ctx);
832 	ibv_ack_cq_events (ev_cq, 1);
833 	res = ibv_req_notify_cq (ev_cq, 0);
834 
835 	res = ibv_poll_cq (instance->send_token_send_cq, 32, wc);
836 	if (res > 0) {
837 		for (i = 0; i < res; i++) {
838 			token_send_buf_put (instance, wrid2void(wc[i].wr_id));
839 		}
840 	}
841 
842 	return (0);
843 }
844 
send_token_cq_recv_event_fn(int fd,int events,void * context)845 static int send_token_cq_recv_event_fn (int fd, int events, void *context)
846 {
847 	struct totemiba_instance *instance = (struct totemiba_instance *)context;
848 	struct ibv_wc wc[32];
849 	struct ibv_cq *ev_cq;
850 	void *ev_ctx;
851 	int res;
852 	int i;
853 
854 	ibv_get_cq_event (instance->send_token_recv_completion_channel, &ev_cq, &ev_ctx);
855 	ibv_ack_cq_events (ev_cq, 1);
856 	res = ibv_req_notify_cq (ev_cq, 0);
857 
858 	res = ibv_poll_cq (instance->send_token_recv_cq, 32, wc);
859 	if (res > 0) {
860 		for (i = 0; i < res; i++) {
861 			iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
862 		}
863 	}
864 
865 	return (0);
866 }
867 
send_token_rdma_event_fn(int fd,int events,void * context)868 static int send_token_rdma_event_fn (int fd, int events, void *context)
869 {
870 	struct totemiba_instance *instance = (struct totemiba_instance *)context;
871 	struct rdma_cm_event *event;
872 	struct rdma_conn_param conn_param;
873 
874 	int res;
875 
876 	res = rdma_get_cm_event (instance->send_token_channel, &event);
877 	if (res != 0) {
878 		return (0);
879 	}
880 
881 	switch (event->event) {
882 	/*
883 	 * occurs when we resolve the multicast address
884 	 */
885 	case RDMA_CM_EVENT_ADDR_RESOLVED:
886 		res = rdma_resolve_route (instance->send_token_cma_id, 2000);
887 		break;
888 	/*
889 	 * occurs when the CM joins the multicast group
890 	 */
891 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
892 		memset (&conn_param, 0, sizeof (struct rdma_conn_param));
893 		conn_param.private_data = NULL;
894 		conn_param.private_data_len = 0;
895 		res = rdma_connect (instance->send_token_cma_id, &conn_param);
896 		break;
897 	case RDMA_CM_EVENT_ESTABLISHED:
898 		instance->send_token_qpn = event->param.ud.qp_num;
899 		instance->send_token_qkey = event->param.ud.qkey;
900 		instance->send_token_ah = ibv_create_ah (instance->send_token_pd, &event->param.ud.ah_attr);
901 		instance->totemiba_target_set_completed (instance->rrp_context);
902 		break;
903 
904 	case RDMA_CM_EVENT_ADDR_ERROR:
905 	case RDMA_CM_EVENT_ROUTE_ERROR:
906 	case RDMA_CM_EVENT_MULTICAST_ERROR:
907 		log_printf (LOGSYS_LEVEL_ERROR,
908 			"send_token_rdma_event_fn multicast error");
909 		break;
910 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
911 		break;
912 	case RDMA_CM_EVENT_UNREACHABLE:
913 		log_printf (LOGSYS_LEVEL_ERROR,
914 			"send_token_rdma_event_fn unreachable");
915 		break;
916 	default:
917 		log_printf (LOGSYS_LEVEL_ERROR,
918 			"send_token_rdma_event_fn unknown event %d",
919 			event->event);
920 		break;
921 	}
922 
923 	rdma_ack_cm_event (event);
924 	return (0);
925 }
926 
send_token_bind(struct totemiba_instance * instance)927 static int send_token_bind (struct totemiba_instance *instance)
928 {
929 	int res;
930 	struct ibv_qp_init_attr init_qp_attr;
931 
932 	instance->send_token_channel = rdma_create_event_channel();
933 	if (instance->send_token_channel == NULL) {
934 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create rdma channel");
935 		return (-1);
936 	}
937 
938 	res = rdma_create_id (instance->send_token_channel,
939 		&instance->send_token_cma_id, NULL, RDMA_PS_UDP);
940 	if (res) {
941 		log_printf (LOGSYS_LEVEL_ERROR, "error creating send_token_cma_id");
942 		return (-1);
943 	}
944 
945 	res = rdma_bind_addr (instance->send_token_cma_id,
946 		&instance->send_token_bind_addr);
947 	if (res) {
948 		log_printf (LOGSYS_LEVEL_ERROR, "error doing rdma_bind_addr for send token");
949 		return (-1);
950 	}
951 
952 	/*
953 	 * Resolve the send_token address into a GUID
954 	 */
955 	res = rdma_resolve_addr (instance->send_token_cma_id,
956 		&instance->bind_addr, &instance->token_addr, 2000);
957 	if (res) {
958 		log_printf (LOGSYS_LEVEL_ERROR, "error resolving send token address %d %d", res, errno);
959 		return (-1);
960 	}
961 
962 	/*
963 	 * Allocate the protection domain
964 	 */
965 	instance->send_token_pd = ibv_alloc_pd (instance->send_token_cma_id->verbs);
966 
967 	/*
968 	 * Create a completion channel
969 	 */
970 	instance->send_token_recv_completion_channel = ibv_create_comp_channel (instance->send_token_cma_id->verbs);
971 	if (instance->send_token_recv_completion_channel == NULL) {
972 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion channel");
973 		return (-1);
974 	}
975 
976 	/*
977 	 * Create the completion queue
978 	 */
979 	instance->send_token_recv_cq = ibv_create_cq (instance->send_token_cma_id->verbs,
980 		COMPLETION_QUEUE_ENTRIES, instance,
981 		instance->send_token_recv_completion_channel, 0);
982 	if (instance->send_token_recv_cq == NULL) {
983 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion queue");
984 		return (-1);
985 	}
986 	res = ibv_req_notify_cq (instance->send_token_recv_cq, 0);
987 	if (res != 0) {
988 		log_printf (LOGSYS_LEVEL_ERROR,
989 			"couldn't request notifications of the completion queue");
990 		return (-1);
991 	}
992 
993 	/*
994 	 * Create a completion channel
995 	 */
996 	instance->send_token_send_completion_channel =
997 		ibv_create_comp_channel (instance->send_token_cma_id->verbs);
998 
999 	if (instance->send_token_send_completion_channel == NULL) {
1000 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion channel");
1001 		return (-1);
1002 	}
1003 
1004 	/*
1005 	 * Create the completion queue
1006 	 */
1007 	instance->send_token_send_cq = ibv_create_cq (
1008 		instance->send_token_cma_id->verbs,
1009 		COMPLETION_QUEUE_ENTRIES, instance,
1010 		instance->send_token_send_completion_channel, 0);
1011 	if (instance->send_token_send_cq == NULL) {
1012 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion queue");
1013 		return (-1);
1014 	}
1015 
1016 	res = ibv_req_notify_cq (instance->send_token_send_cq, 0);
1017 	if (res != 0) {
1018 		log_printf (LOGSYS_LEVEL_ERROR,
1019 			"couldn't request notifications of the completion queue");
1020 		return (-1);
1021 	}
1022 	memset (&init_qp_attr, 0, sizeof (struct ibv_qp_init_attr));
1023 	init_qp_attr.cap.max_send_wr = 50;
1024 	init_qp_attr.cap.max_recv_wr = TOTAL_READ_POSTS;
1025 	init_qp_attr.cap.max_send_sge = 1;
1026 	init_qp_attr.cap.max_recv_sge = 1;
1027 	init_qp_attr.qp_context = instance;
1028 	init_qp_attr.sq_sig_all = 0;
1029 	init_qp_attr.qp_type = IBV_QPT_UD;
1030 	init_qp_attr.send_cq = instance->send_token_send_cq;
1031 	init_qp_attr.recv_cq = instance->send_token_recv_cq;
1032 	res = rdma_create_qp (instance->send_token_cma_id,
1033 		instance->send_token_pd, &init_qp_attr);
1034 	if (res != 0) {
1035 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create queue pair");
1036 		return (-1);
1037 	}
1038 
1039 	qb_loop_poll_add (
1040 		instance->totemiba_poll_handle,
1041 		QB_LOOP_MED,
1042 		instance->send_token_recv_completion_channel->fd,
1043 		POLLIN, instance, send_token_cq_recv_event_fn);
1044 
1045 	qb_loop_poll_add (
1046 		instance->totemiba_poll_handle,
1047 		QB_LOOP_MED,
1048 		instance->send_token_send_completion_channel->fd,
1049 		POLLIN, instance, send_token_cq_send_event_fn);
1050 
1051 	qb_loop_poll_add (
1052 		instance->totemiba_poll_handle,
1053 		QB_LOOP_MED,
1054 		instance->send_token_channel->fd,
1055 		POLLIN, instance, send_token_rdma_event_fn);
1056 
1057 	instance->send_token_bound = 1;
1058 	return (0);
1059 }
1060 
send_token_unbind(struct totemiba_instance * instance)1061 static int send_token_unbind (struct totemiba_instance *instance)
1062 {
1063 	if (instance->send_token_bound == 0) {
1064 		return (0);
1065 	}
1066 
1067 	qb_loop_poll_del (
1068 		instance->totemiba_poll_handle,
1069 		instance->send_token_recv_completion_channel->fd);
1070 	qb_loop_poll_del (
1071 		instance->totemiba_poll_handle,
1072 		instance->send_token_send_completion_channel->fd);
1073 	qb_loop_poll_del (
1074 		instance->totemiba_poll_handle,
1075 		instance->send_token_channel->fd);
1076 
1077 	if(instance->send_token_ah)
1078 	{
1079 		ibv_destroy_ah(instance->send_token_ah);
1080 		instance->send_token_ah = 0;
1081 	}
1082 
1083 	rdma_destroy_qp (instance->send_token_cma_id);
1084 	ibv_destroy_cq (instance->send_token_send_cq);
1085 	ibv_destroy_cq (instance->send_token_recv_cq);
1086 	ibv_destroy_comp_channel (instance->send_token_send_completion_channel);
1087 	ibv_destroy_comp_channel (instance->send_token_recv_completion_channel);
1088 	token_send_buf_destroy (instance);
1089 	ibv_dealloc_pd (instance->send_token_pd);
1090 	rdma_destroy_id (instance->send_token_cma_id);
1091 	rdma_destroy_event_channel (instance->send_token_channel);
1092 	return (0);
1093 }
1094 
recv_token_bind(struct totemiba_instance * instance)1095 static int recv_token_bind (struct totemiba_instance *instance)
1096 {
1097 	int res;
1098 	struct ibv_port_attr port_attr;
1099 
1100 	instance->listen_recv_token_channel = rdma_create_event_channel();
1101 	if (instance->listen_recv_token_channel == NULL) {
1102 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create rdma channel");
1103 		return (-1);
1104 	}
1105 
1106 	res = rdma_create_id (instance->listen_recv_token_channel,
1107 		&instance->listen_recv_token_cma_id, NULL, RDMA_PS_UDP);
1108 	if (res) {
1109 		log_printf (LOGSYS_LEVEL_ERROR, "error creating recv_token_cma_id");
1110 		return (-1);
1111 	}
1112 
1113 	res = rdma_bind_addr (instance->listen_recv_token_cma_id,
1114 		&instance->bind_addr);
1115 	if (res) {
1116 		log_printf (LOGSYS_LEVEL_ERROR, "error doing rdma_bind_addr for recv token");
1117 		return (-1);
1118 	}
1119 
1120 	/*
1121 	 * Determine active_mtu of port and compare it with the configured one (160 is aproximation of all totem
1122 	 * structures.
1123 	 *
1124 	 * TODO: Implement MTU discovery also for IP and handle MTU correctly for all structures inside totemsrp,
1125 	 *       crypto, ...
1126 	 */
1127 	res = ibv_query_port (instance->listen_recv_token_cma_id->verbs, instance->listen_recv_token_cma_id->port_num, &port_attr);
1128 	if ( (1 << (port_attr.active_mtu + 7)) < instance->totem_config->net_mtu + 160) {
1129 		log_printf (LOGSYS_LEVEL_ERROR, "requested net_mtu is %d and is larger than the active port mtu %d\n",\
1130 				instance->totem_config->net_mtu + 160, (1 << (port_attr.active_mtu + 7)));
1131 		return (-1);
1132 	}
1133 
1134 	/*
1135 	 * Resolve the recv_token address into a GUID
1136 	 */
1137 	res = rdma_listen (instance->listen_recv_token_cma_id, 10);
1138 	if (res) {
1139 		log_printf (LOGSYS_LEVEL_ERROR, "error listening %d %d", res, errno);
1140 		return (-1);
1141 	}
1142 
1143 	qb_loop_poll_add (
1144 		instance->totemiba_poll_handle,
1145 		QB_LOOP_MED,
1146 		instance->listen_recv_token_channel->fd,
1147 		POLLIN, instance, recv_token_rdma_event_fn);
1148 
1149 	return (0);
1150 }
1151 
mcast_bind(struct totemiba_instance * instance)1152 static int mcast_bind (struct totemiba_instance *instance)
1153 {
1154 	int res;
1155 	struct ibv_qp_init_attr init_qp_attr;
1156 
1157 	instance->mcast_channel = rdma_create_event_channel();
1158 	if (instance->mcast_channel == NULL) {
1159 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create rdma channel");
1160 		return (-1);
1161 	}
1162 
1163 	res = rdma_create_id (instance->mcast_channel, &instance->mcast_cma_id, NULL, RDMA_PS_UDP);
1164 	if (res) {
1165 		log_printf (LOGSYS_LEVEL_ERROR, "error creating mcast_cma_id");
1166 		return (-1);
1167 	}
1168 
1169 	res = rdma_bind_addr (instance->mcast_cma_id, &instance->local_mcast_bind_addr);
1170 	if (res) {
1171 		log_printf (LOGSYS_LEVEL_ERROR, "error doing rdma_bind_addr for mcast");
1172 		return (-1);
1173 	}
1174 
1175 	/*
1176 	 * Resolve the multicast address into a GUID
1177 	 */
1178 	res = rdma_resolve_addr (instance->mcast_cma_id, &instance->local_mcast_bind_addr,
1179 		&instance->mcast_addr, 5000);
1180 	if (res) {
1181 		log_printf (LOGSYS_LEVEL_ERROR, "error resolving multicast address %d %d", res, errno);
1182 		return (-1);
1183 	}
1184 
1185 	/*
1186 	 * Allocate the protection domain
1187 	 */
1188 	instance->mcast_pd = ibv_alloc_pd (instance->mcast_cma_id->verbs);
1189 
1190 	/*
1191 	 * Create a completion channel
1192 	 */
1193 	instance->mcast_recv_completion_channel = ibv_create_comp_channel (instance->mcast_cma_id->verbs);
1194 	if (instance->mcast_recv_completion_channel == NULL) {
1195 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion channel");
1196 		return (-1);
1197 	}
1198 
1199 	/*
1200 	 * Create the completion queue
1201 	 */
1202 	instance->mcast_recv_cq = ibv_create_cq (instance->mcast_cma_id->verbs,
1203 		COMPLETION_QUEUE_ENTRIES, instance,
1204 		instance->mcast_recv_completion_channel, 0);
1205 	if (instance->mcast_recv_cq == NULL) {
1206 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion queue");
1207 		return (-1);
1208 	}
1209 	res = ibv_req_notify_cq (instance->mcast_recv_cq, 0);
1210 	if (res != 0) {
1211 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't request notifications of the completion queue");
1212 		return (-1);
1213 	}
1214 
1215 	/*
1216 	 * Create a completion channel
1217 	 */
1218 	instance->mcast_send_completion_channel = ibv_create_comp_channel (instance->mcast_cma_id->verbs);
1219 	if (instance->mcast_send_completion_channel == NULL) {
1220 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion channel");
1221 		return (-1);
1222 	}
1223 
1224 	/*
1225 	 * Create the completion queue
1226 	 */
1227 	instance->mcast_send_cq = ibv_create_cq (instance->mcast_cma_id->verbs,
1228 		COMPLETION_QUEUE_ENTRIES, instance,
1229 		instance->mcast_send_completion_channel, 0);
1230 	if (instance->mcast_send_cq == NULL) {
1231 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion queue");
1232 		return (-1);
1233 	}
1234 	res = ibv_req_notify_cq (instance->mcast_send_cq, 0);
1235 	if (res != 0) {
1236 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't request notifications of the completion queue");
1237 		return (-1);
1238 	}
1239 	memset (&init_qp_attr, 0, sizeof (struct ibv_qp_init_attr));
1240 	init_qp_attr.cap.max_send_wr = 50;
1241 	init_qp_attr.cap.max_recv_wr = TOTAL_READ_POSTS;
1242 	init_qp_attr.cap.max_send_sge = 1;
1243 	init_qp_attr.cap.max_recv_sge = 1;
1244 	init_qp_attr.qp_context = instance;
1245 	init_qp_attr.sq_sig_all = 0;
1246 	init_qp_attr.qp_type = IBV_QPT_UD;
1247 	init_qp_attr.send_cq = instance->mcast_send_cq;
1248 	init_qp_attr.recv_cq = instance->mcast_recv_cq;
1249 	res = rdma_create_qp (instance->mcast_cma_id, instance->mcast_pd,
1250 		&init_qp_attr);
1251 	if (res != 0) {
1252 		log_printf (LOGSYS_LEVEL_ERROR, "couldn't create queue pair");
1253 		return (-1);
1254 	}
1255 
1256 	mcast_recv_buf_post_initial (instance);
1257 
1258 	qb_loop_poll_add (
1259 		instance->totemiba_poll_handle,
1260 		QB_LOOP_MED,
1261 		instance->mcast_recv_completion_channel->fd,
1262 		POLLIN, instance, mcast_cq_recv_event_fn);
1263 
1264 	qb_loop_poll_add (
1265 		instance->totemiba_poll_handle,
1266 		QB_LOOP_MED,
1267 		instance->mcast_send_completion_channel->fd,
1268 		POLLIN, instance, mcast_cq_send_event_fn);
1269 
1270 	qb_loop_poll_add (
1271 		instance->totemiba_poll_handle,
1272 		QB_LOOP_MED,
1273 		instance->mcast_channel->fd,
1274 		POLLIN, instance, mcast_rdma_event_fn);
1275 
1276 	return (0);
1277 }
1278 
timer_function_netif_check_timeout(void * data)1279 static void timer_function_netif_check_timeout (
1280       void *data)
1281 {
1282 	struct totemiba_instance *instance = (struct totemiba_instance *)data;
1283 	int res;
1284 	int interface_up;
1285 	int interface_num;
1286 	int addr_len;
1287 
1288 	totemip_iface_check (&instance->totem_interface->bindnet,
1289 		&instance->totem_interface->boundto, &interface_up, &interface_num, instance->totem_config->clear_node_high_bit);
1290 
1291 	totemip_totemip_to_sockaddr_convert(&instance->totem_interface->boundto,
1292 		instance->totem_interface->ip_port, (struct sockaddr_storage *)&instance->bind_addr,
1293 		&addr_len);
1294 
1295 	totemip_totemip_to_sockaddr_convert(&instance->totem_interface->boundto,
1296 		0, (struct sockaddr_storage *)&instance->send_token_bind_addr,
1297 		&addr_len);
1298 
1299 	totemip_totemip_to_sockaddr_convert(&instance->totem_interface->boundto,
1300 		0, (struct sockaddr_storage *)&instance->local_mcast_bind_addr,
1301 		&addr_len);
1302 
1303 	totemip_totemip_to_sockaddr_convert(&instance->totem_interface->boundto,
1304 		instance->totem_interface->ip_port, (struct sockaddr_storage *)&instance->my_id,
1305 		&addr_len);
1306 
1307 	totemip_sockaddr_to_totemip_convert(
1308 		(const struct sockaddr_storage *)&instance->bind_addr,
1309 		&instance->my_id);
1310 
1311 	memcpy (&instance->my_id, &instance->totem_interface->boundto,
1312 		sizeof (struct totem_ip_address));
1313 
1314 	totemip_totemip_to_sockaddr_convert(&instance->totem_interface->mcast_addr,
1315 		instance->totem_interface->ip_port,
1316 		(struct sockaddr_storage *)&instance->mcast_addr, &addr_len);
1317 
1318 	res = recv_token_bind (instance);
1319 
1320 	res = mcast_bind (instance);
1321 }
1322 
totemiba_crypto_set(void * iba_context,const char * cipher_type,const char * hash_type)1323 int totemiba_crypto_set (
1324 	void *iba_context,
1325 	const char *cipher_type,
1326 	const char *hash_type)
1327 {
1328 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1329 	int res = 0;
1330 
1331 	instance = NULL;
1332 
1333 	return (res);
1334 }
1335 
totemiba_finalize(void * iba_context)1336 int totemiba_finalize (
1337 	void *iba_context)
1338 {
1339 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1340 	int res = 0;
1341 
1342 	instance = NULL;
1343 
1344 	return (res);
1345 }
1346 
1347 /*
1348  * Create an instance
1349  */
totemiba_initialize(qb_loop_t * qb_poll_handle,void ** iba_context,struct totem_config * totem_config,totemsrp_stats_t * stats,int interface_no,void * context,void (* deliver_fn)(void * context,const void * msg,unsigned int msg_len),void (* iface_change_fn)(void * context,const struct totem_ip_address * iface_address),void (* target_set_completed)(void * context))1350 int totemiba_initialize (
1351 	qb_loop_t *qb_poll_handle,
1352 	void **iba_context,
1353 	struct totem_config *totem_config,
1354 	totemsrp_stats_t *stats,
1355 	int interface_no,
1356 	void *context,
1357 
1358 	void (*deliver_fn) (
1359 		void *context,
1360 		const void *msg,
1361 		unsigned int msg_len),
1362 
1363 	void (*iface_change_fn) (
1364 		void *context,
1365 		const struct totem_ip_address *iface_address),
1366 
1367 	void (*target_set_completed) (
1368 		void *context))
1369 {
1370 	struct totemiba_instance *instance;
1371 	int res = 0;
1372 
1373 	instance = malloc (sizeof (struct totemiba_instance));
1374 	if (instance == NULL) {
1375 		return (-1);
1376 	}
1377 
1378 	totemiba_instance_initialize (instance);
1379 
1380 	instance->totem_interface = &totem_config->interfaces[interface_no];
1381 
1382 	instance->totemiba_poll_handle = qb_poll_handle;
1383 
1384 	instance->totem_interface->bindnet.nodeid = totem_config->node_id;
1385 
1386 	instance->totemiba_deliver_fn = deliver_fn;
1387 
1388 	instance->totemiba_target_set_completed = target_set_completed;
1389 
1390 	instance->totemiba_iface_change_fn = iface_change_fn;
1391 
1392 	instance->totem_config = totem_config;
1393 	instance->stats = stats;
1394 
1395 	instance->rrp_context = context;
1396 
1397 	qb_loop_timer_add (instance->totemiba_poll_handle,
1398 		QB_LOOP_MED,
1399 		100*QB_TIME_NS_IN_MSEC,
1400 		(void *)instance,
1401 		timer_function_netif_check_timeout,
1402 		&instance->timer_netif_check_timeout);
1403 
1404 	instance->totemiba_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
1405 	instance->totemiba_log_printf = totem_config->totem_logging_configuration.log_printf;
1406 
1407 	*iba_context = instance;
1408 	return (res);
1409 }
1410 
totemiba_buffer_alloc(void)1411 void *totemiba_buffer_alloc (void)
1412 {
1413 	return malloc (MAX_MTU_SIZE);
1414 }
1415 
totemiba_buffer_release(void * ptr)1416 void totemiba_buffer_release (void *ptr)
1417 {
1418 	return free (ptr);
1419 }
1420 
totemiba_processor_count_set(void * iba_context,int processor_count)1421 int totemiba_processor_count_set (
1422 	void *iba_context,
1423 	int processor_count)
1424 {
1425 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1426 	int res = 0;
1427 
1428 	instance = NULL;
1429 
1430 	return (res);
1431 }
1432 
totemiba_recv_flush(void * iba_context)1433 int totemiba_recv_flush (void *iba_context)
1434 {
1435 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1436 	int res = 0;
1437 
1438 	instance = NULL;
1439 
1440 	return (res);
1441 }
1442 
totemiba_send_flush(void * iba_context)1443 int totemiba_send_flush (void *iba_context)
1444 {
1445 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1446 	int res = 0;
1447 
1448 	instance = NULL;
1449 
1450 	return (res);
1451 }
1452 
totemiba_token_send(void * iba_context,const void * ms,unsigned int msg_len)1453 int totemiba_token_send (
1454 	void *iba_context,
1455 	const void *ms,
1456 	unsigned int msg_len)
1457 {
1458 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1459 	int res = 0;
1460 	struct ibv_send_wr send_wr, *failed_send_wr;
1461 	struct ibv_sge sge;
1462 	void *msg;
1463 	struct send_buf *send_buf;
1464 
1465 	send_buf = token_send_buf_get (instance);
1466 	if (send_buf == NULL) {
1467 		return (-1);
1468 	}
1469 	msg = send_buf->buffer;
1470 	memcpy (msg, ms, msg_len);
1471 
1472 	send_wr.next = NULL;
1473 	send_wr.sg_list = &sge;
1474 	send_wr.num_sge = 1;
1475 	send_wr.opcode = IBV_WR_SEND;
1476 	send_wr.send_flags = IBV_SEND_SIGNALED;
1477 	send_wr.wr_id = void2wrid(send_buf);
1478 	send_wr.imm_data = 0;
1479 	send_wr.wr.ud.ah = instance->send_token_ah;
1480 	send_wr.wr.ud.remote_qpn = instance->send_token_qpn;
1481 	send_wr.wr.ud.remote_qkey = instance->send_token_qkey;
1482 
1483 	sge.length = msg_len;
1484 	sge.lkey = send_buf->mr->lkey;
1485 	sge.addr = (uintptr_t)msg;
1486 
1487 	if(instance->send_token_ah != 0 && instance->send_token_bound)
1488 		res = ibv_post_send (instance->send_token_cma_id->qp, &send_wr, &failed_send_wr);
1489 
1490 	return (res);
1491 }
1492 
totemiba_mcast_flush_send(void * iba_context,const void * ms,unsigned int msg_len)1493 int totemiba_mcast_flush_send (
1494 	void *iba_context,
1495 	const void *ms,
1496 	unsigned int msg_len)
1497 {
1498 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1499 	int res = 0;
1500 	struct ibv_send_wr send_wr, *failed_send_wr;
1501 	struct ibv_sge sge;
1502 	void *msg;
1503 	struct send_buf *send_buf;
1504 
1505 	send_buf = mcast_send_buf_get (instance);
1506 	if (send_buf == NULL) {
1507 		return (-1);
1508 	}
1509 
1510 	msg = send_buf->buffer;
1511 	memcpy (msg, ms, msg_len);
1512 	send_wr.next = NULL;
1513 	send_wr.sg_list = &sge;
1514 	send_wr.num_sge = 1;
1515 	send_wr.opcode = IBV_WR_SEND;
1516 	send_wr.send_flags = IBV_SEND_SIGNALED;
1517 	send_wr.wr_id = void2wrid(send_buf);
1518 	send_wr.imm_data = 0;
1519 	send_wr.wr.ud.ah = instance->mcast_ah;
1520 	send_wr.wr.ud.remote_qpn = instance->mcast_qpn;
1521 	send_wr.wr.ud.remote_qkey = instance->mcast_qkey;
1522 
1523 	sge.length = msg_len;
1524 	sge.lkey = send_buf->mr->lkey;
1525 	sge.addr = (uintptr_t)msg;
1526 
1527 	if (instance->mcast_ah != 0) {
1528 		res = ibv_post_send (instance->mcast_cma_id->qp, &send_wr, &failed_send_wr);
1529 	}
1530 
1531 	return (res);
1532 }
1533 
totemiba_mcast_noflush_send(void * iba_context,const void * ms,unsigned int msg_len)1534 int totemiba_mcast_noflush_send (
1535 	void *iba_context,
1536 	const void *ms,
1537 	unsigned int msg_len)
1538 {
1539 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1540 	int res = 0;
1541 	struct ibv_send_wr send_wr, *failed_send_wr;
1542 	struct ibv_sge sge;
1543 	void *msg;
1544 	struct send_buf *send_buf;
1545 
1546 	send_buf = mcast_send_buf_get (instance);
1547 	if (send_buf == NULL) {
1548 		return (-1);
1549 	}
1550 
1551 	msg = send_buf->buffer;
1552 	memcpy (msg, ms, msg_len);
1553 	send_wr.next = NULL;
1554 	send_wr.sg_list = &sge;
1555 	send_wr.num_sge = 1;
1556 	send_wr.opcode = IBV_WR_SEND;
1557 	send_wr.send_flags = IBV_SEND_SIGNALED;
1558 	send_wr.wr_id = void2wrid(send_buf);
1559 	send_wr.imm_data = 0;
1560 	send_wr.wr.ud.ah = instance->mcast_ah;
1561 	send_wr.wr.ud.remote_qpn = instance->mcast_qpn;
1562 	send_wr.wr.ud.remote_qkey = instance->mcast_qkey;
1563 
1564 	sge.length = msg_len;
1565 	sge.lkey = send_buf->mr->lkey;
1566 	sge.addr = (uintptr_t)msg;
1567 
1568 	if (instance->mcast_ah != 0) {
1569 		res = ibv_post_send (instance->mcast_cma_id->qp, &send_wr, &failed_send_wr);
1570 	}
1571 
1572 	return (res);
1573 }
1574 
totemiba_iface_check(void * iba_context)1575 extern int totemiba_iface_check (void *iba_context)
1576 {
1577 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1578 	int res = 0;
1579 
1580 	instance = NULL;
1581 
1582 	return (res);
1583 }
1584 
totemiba_net_mtu_adjust(void * iba_context,struct totem_config * totem_config)1585 extern void totemiba_net_mtu_adjust (void *iba_context, struct totem_config *totem_config)
1586 {
1587 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1588 	instance = NULL;
1589 }
1590 
totemiba_iface_print(void * iba_context)1591 const char *totemiba_iface_print (void *iba_context)  {
1592 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1593 
1594         const char *ret_char;
1595 
1596         ret_char = totemip_print (&instance->my_id);
1597 
1598         return (ret_char);
1599 }
1600 
totemiba_iface_get(void * iba_context,struct totem_ip_address * addr)1601 int totemiba_iface_get (
1602 	void *iba_context,
1603 	struct totem_ip_address *addr)
1604 {
1605 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1606 	int res = 0;
1607 
1608 	memcpy (addr, &instance->my_id, sizeof (struct totem_ip_address));
1609 
1610 	return (res);
1611 }
1612 
totemiba_token_target_set(void * iba_context,const struct totem_ip_address * token_target)1613 int totemiba_token_target_set (
1614 	void *iba_context,
1615 	const struct totem_ip_address *token_target)
1616 {
1617 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1618 	int res = 0;
1619 	int addr_len = 16;
1620 
1621 	totemip_totemip_to_sockaddr_convert((struct totem_ip_address *)token_target,
1622 		instance->totem_interface->ip_port, (struct sockaddr_storage *)&instance->token_addr,
1623 		&addr_len);
1624 
1625 	res = send_token_unbind (instance);
1626 
1627 	res = send_token_bind (instance);
1628 
1629 	return (res);
1630 }
1631 
totemiba_recv_mcast_empty(void * iba_context)1632 extern int totemiba_recv_mcast_empty (
1633 	void *iba_context)
1634 {
1635 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
1636 	int res = 0;
1637 
1638 	instance = NULL;
1639 
1640 	return (res);
1641 }
1642 
1643