1 /*
2  * librpma_gpspm: IO engine that uses PMDK librpma to write data,
3  *		based on General Purpose Server Persistency Method
4  *
5  * Copyright 2020-2021, Intel Corporation
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License,
9  * version 2 as published by the Free Software Foundation..
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  */
16 
17 #include "librpma_fio.h"
18 
19 #include <libpmem.h>
20 
21 /* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */
22 #include "librpma_gpspm_flush.pb-c.h"
23 
24 #define MAX_MSG_SIZE (512)
25 #define IO_U_BUF_LEN (2 * MAX_MSG_SIZE)
26 #define SEND_OFFSET (0)
27 #define RECV_OFFSET (SEND_OFFSET + MAX_MSG_SIZE)
28 
29 #define GPSPM_FLUSH_REQUEST__LAST \
30 	{ PROTOBUF_C_MESSAGE_INIT(&gpspm_flush_request__descriptor), 0, 0, 0 }
31 
32 /*
33  * 'Flush_req_last' is the last flush request
34  * the client has to send to server to indicate
35  * that the client is done.
36  */
37 static const GPSPMFlushRequest Flush_req_last = GPSPM_FLUSH_REQUEST__LAST;
38 
39 #define IS_NOT_THE_LAST_MESSAGE(flush_req) \
40 	(flush_req->length != Flush_req_last.length || \
41 	flush_req->offset != Flush_req_last.offset)
42 
43 /* client side implementation */
44 
45 /* get next io_u message buffer in the round-robin fashion */
46 #define IO_U_NEXT_BUF_OFF_CLIENT(cd) \
47 	(IO_U_BUF_LEN * ((cd->msg_curr++) % cd->msg_num))
48 
49 struct client_data {
50 	/* memory for sending and receiving buffered */
51 	char *io_us_msgs;
52 
53 	/* resources for messaging buffer */
54 	uint32_t msg_num;
55 	uint32_t msg_curr;
56 	struct rpma_mr_local *msg_mr;
57 };
58 
59 static inline int client_io_flush(struct thread_data *td,
60 		struct io_u *first_io_u, struct io_u *last_io_u,
61 		unsigned long long int len);
62 
63 static int client_get_io_u_index(struct rpma_completion *cmpl,
64 		unsigned int *io_u_index);
65 
client_init(struct thread_data * td)66 static int client_init(struct thread_data *td)
67 {
68 	struct librpma_fio_client_data *ccd;
69 	struct client_data *cd;
70 	uint32_t write_num;
71 	struct rpma_conn_cfg *cfg = NULL;
72 	int ret;
73 
74 	/*
75 	 * not supported:
76 	 * - readwrite = read / trim / randread / randtrim /
77 	 *               / rw / randrw / trimwrite
78 	 */
79 	if (td_read(td) || td_trim(td)) {
80 		td_verror(td, EINVAL, "Not supported mode.");
81 		return -1;
82 	}
83 
84 	/* allocate client's data */
85 	cd = calloc(1, sizeof(*cd));
86 	if (cd == NULL) {
87 		td_verror(td, errno, "calloc");
88 		return -1;
89 	}
90 
91 	/*
92 	 * Calculate the required number of WRITEs and FLUSHes.
93 	 *
94 	 * Note: Each flush is a request (SEND) and response (RECV) pair.
95 	 */
96 	if (td_random(td)) {
97 		write_num = td->o.iodepth; /* WRITE * N */
98 		cd->msg_num = td->o.iodepth; /* FLUSH * N */
99 	} else {
100 		if (td->o.sync_io) {
101 			write_num = 1; /* WRITE */
102 			cd->msg_num = 1; /* FLUSH */
103 		} else {
104 			write_num = td->o.iodepth; /* WRITE * N */
105 			/*
106 			 * FLUSH * B where:
107 			 * - B == ceil(iodepth / iodepth_batch)
108 			 *   which is the number of batches for N writes
109 			 */
110 			cd->msg_num = LIBRPMA_FIO_CEIL(td->o.iodepth,
111 					td->o.iodepth_batch);
112 		}
113 	}
114 
115 	/* create a connection configuration object */
116 	if ((ret = rpma_conn_cfg_new(&cfg))) {
117 		librpma_td_verror(td, ret, "rpma_conn_cfg_new");
118 		goto err_free_cd;
119 	}
120 
121 	/*
122 	 * Calculate the required queue sizes where:
123 	 * - the send queue (SQ) has to be big enough to accommodate
124 	 *   all io_us (WRITEs) and all flush requests (SENDs)
125 	 * - the receive queue (RQ) has to be big enough to accommodate
126 	 *   all flush responses (RECVs)
127 	 * - the completion queue (CQ) has to be big enough to accommodate all
128 	 *   success and error completions (sq_size + rq_size)
129 	 */
130 	if ((ret = rpma_conn_cfg_set_sq_size(cfg, write_num + cd->msg_num))) {
131 		librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
132 		goto err_cfg_delete;
133 	}
134 	if ((ret = rpma_conn_cfg_set_rq_size(cfg, cd->msg_num))) {
135 		librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
136 		goto err_cfg_delete;
137 	}
138 	if ((ret = rpma_conn_cfg_set_cq_size(cfg, write_num + cd->msg_num * 2))) {
139 		librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
140 		goto err_cfg_delete;
141 	}
142 
143 	if (librpma_fio_client_init(td, cfg))
144 		goto err_cfg_delete;
145 
146 	ccd = td->io_ops_data;
147 
148 	if (ccd->ws->direct_write_to_pmem &&
149 	    ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT &&
150 	    td->thread_number == 1) {
151 		/* XXX log_info mixes with the JSON output */
152 		log_err(
153 			"Note: The server side supports Direct Write to PMem and it is equipped with PMem (direct_write_to_pmem).\n"
154 			"You can use librpma_client and librpma_server engines for better performance instead of GPSPM.\n");
155 	}
156 
157 	/* validate the server's RQ capacity */
158 	if (cd->msg_num > ccd->ws->max_msg_num) {
159 		log_err(
160 			"server's RQ size (iodepth) too small to handle the client's workspace requirements (%u < %u)\n",
161 			ccd->ws->max_msg_num, cd->msg_num);
162 		goto err_cleanup_common;
163 	}
164 
165 	if ((ret = rpma_conn_cfg_delete(&cfg))) {
166 		librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
167 		/* non fatal error - continue */
168 	}
169 
170 	ccd->flush = client_io_flush;
171 	ccd->get_io_u_index = client_get_io_u_index;
172 	ccd->client_data = cd;
173 
174 	return 0;
175 
176 err_cleanup_common:
177 	librpma_fio_client_cleanup(td);
178 
179 err_cfg_delete:
180 	(void) rpma_conn_cfg_delete(&cfg);
181 
182 err_free_cd:
183 	free(cd);
184 
185 	return -1;
186 }
187 
client_post_init(struct thread_data * td)188 static int client_post_init(struct thread_data *td)
189 {
190 	struct librpma_fio_client_data *ccd = td->io_ops_data;
191 	struct client_data *cd = ccd->client_data;
192 	unsigned int io_us_msgs_size;
193 	int ret;
194 
195 	/* message buffers initialization and registration */
196 	io_us_msgs_size = cd->msg_num * IO_U_BUF_LEN;
197 	if ((ret = posix_memalign((void **)&cd->io_us_msgs, page_size,
198 			io_us_msgs_size))) {
199 		td_verror(td, ret, "posix_memalign");
200 		return ret;
201 	}
202 	if ((ret = rpma_mr_reg(ccd->peer, cd->io_us_msgs, io_us_msgs_size,
203 			RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
204 			&cd->msg_mr))) {
205 		librpma_td_verror(td, ret, "rpma_mr_reg");
206 		return ret;
207 	}
208 
209 	return librpma_fio_client_post_init(td);
210 }
211 
client_cleanup(struct thread_data * td)212 static void client_cleanup(struct thread_data *td)
213 {
214 	struct librpma_fio_client_data *ccd = td->io_ops_data;
215 	struct client_data *cd;
216 	size_t flush_req_size;
217 	size_t io_u_buf_off;
218 	size_t send_offset;
219 	void *send_ptr;
220 	int ret;
221 
222 	if (ccd == NULL)
223 		return;
224 
225 	cd = ccd->client_data;
226 	if (cd == NULL) {
227 		librpma_fio_client_cleanup(td);
228 		return;
229 	}
230 
231 	/*
232 	 * Make sure all SEND completions are collected ergo there are free
233 	 * slots in the SQ for the last SEND message.
234 	 *
235 	 * Note: If any operation will fail we still can send the termination
236 	 * notice.
237 	 */
238 	(void) librpma_fio_client_io_complete_all_sends(td);
239 
240 	/* prepare the last flush message and pack it to the send buffer */
241 	flush_req_size = gpspm_flush_request__get_packed_size(&Flush_req_last);
242 	if (flush_req_size > MAX_MSG_SIZE) {
243 		log_err(
244 			"Packed flush request size is bigger than available send buffer space (%zu > %d\n",
245 			flush_req_size, MAX_MSG_SIZE);
246 	} else {
247 		io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
248 		send_offset = io_u_buf_off + SEND_OFFSET;
249 		send_ptr = cd->io_us_msgs + send_offset;
250 		(void) gpspm_flush_request__pack(&Flush_req_last, send_ptr);
251 
252 		/* send the flush message */
253 		if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset,
254 				flush_req_size, RPMA_F_COMPLETION_ALWAYS,
255 				NULL)))
256 			librpma_td_verror(td, ret, "rpma_send");
257 
258 		++ccd->op_send_posted;
259 
260 		/* Wait for the SEND to complete */
261 		(void) librpma_fio_client_io_complete_all_sends(td);
262 	}
263 
264 	/* deregister the messaging buffer memory */
265 	if ((ret = rpma_mr_dereg(&cd->msg_mr)))
266 		librpma_td_verror(td, ret, "rpma_mr_dereg");
267 
268 	free(ccd->client_data);
269 
270 	librpma_fio_client_cleanup(td);
271 }
272 
client_io_flush(struct thread_data * td,struct io_u * first_io_u,struct io_u * last_io_u,unsigned long long int len)273 static inline int client_io_flush(struct thread_data *td,
274 		struct io_u *first_io_u, struct io_u *last_io_u,
275 		unsigned long long int len)
276 {
277 	struct librpma_fio_client_data *ccd = td->io_ops_data;
278 	struct client_data *cd = ccd->client_data;
279 	size_t io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
280 	size_t send_offset = io_u_buf_off + SEND_OFFSET;
281 	size_t recv_offset = io_u_buf_off + RECV_OFFSET;
282 	void *send_ptr = cd->io_us_msgs + send_offset;
283 	void *recv_ptr = cd->io_us_msgs + recv_offset;
284 	GPSPMFlushRequest flush_req = GPSPM_FLUSH_REQUEST__INIT;
285 	size_t flush_req_size = 0;
286 	int ret;
287 
288 	/* prepare a response buffer */
289 	if ((ret = rpma_recv(ccd->conn, cd->msg_mr, recv_offset, MAX_MSG_SIZE,
290 			recv_ptr))) {
291 		librpma_td_verror(td, ret, "rpma_recv");
292 		return -1;
293 	}
294 
295 	/* prepare a flush message and pack it to a send buffer */
296 	flush_req.offset = first_io_u->offset;
297 	flush_req.length = len;
298 	flush_req.op_context = last_io_u->index;
299 	flush_req_size = gpspm_flush_request__get_packed_size(&flush_req);
300 	if (flush_req_size > MAX_MSG_SIZE) {
301 		log_err(
302 			"Packed flush request size is bigger than available send buffer space (%"
303 			PRIu64 " > %d\n", flush_req_size, MAX_MSG_SIZE);
304 		return -1;
305 	}
306 	(void) gpspm_flush_request__pack(&flush_req, send_ptr);
307 
308 	/* send the flush message */
309 	if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset, flush_req_size,
310 			RPMA_F_COMPLETION_ALWAYS, NULL))) {
311 		librpma_td_verror(td, ret, "rpma_send");
312 		return -1;
313 	}
314 
315 	++ccd->op_send_posted;
316 
317 	return 0;
318 }
319 
client_get_io_u_index(struct rpma_completion * cmpl,unsigned int * io_u_index)320 static int client_get_io_u_index(struct rpma_completion *cmpl,
321 		unsigned int *io_u_index)
322 {
323 	GPSPMFlushResponse *flush_resp;
324 
325 	if (cmpl->op != RPMA_OP_RECV)
326 		return 0;
327 
328 	/* unpack a response from the received buffer */
329 	flush_resp = gpspm_flush_response__unpack(NULL,
330 			cmpl->byte_len, cmpl->op_context);
331 	if (flush_resp == NULL) {
332 		log_err("Cannot unpack the flush response buffer\n");
333 		return -1;
334 	}
335 
336 	memcpy(io_u_index, &flush_resp->op_context, sizeof(*io_u_index));
337 
338 	gpspm_flush_response__free_unpacked(flush_resp, NULL);
339 
340 	return 1;
341 }
342 
343 FIO_STATIC struct ioengine_ops ioengine_client = {
344 	.name			= "librpma_gpspm_client",
345 	.version		= FIO_IOOPS_VERSION,
346 	.init			= client_init,
347 	.post_init		= client_post_init,
348 	.get_file_size		= librpma_fio_client_get_file_size,
349 	.open_file		= librpma_fio_file_nop,
350 	.queue			= librpma_fio_client_queue,
351 	.commit			= librpma_fio_client_commit,
352 	.getevents		= librpma_fio_client_getevents,
353 	.event			= librpma_fio_client_event,
354 	.errdetails		= librpma_fio_client_errdetails,
355 	.close_file		= librpma_fio_file_nop,
356 	.cleanup		= client_cleanup,
357 	.flags			= FIO_DISKLESSIO,
358 	.options		= librpma_fio_options,
359 	.option_struct_size	= sizeof(struct librpma_fio_options_values),
360 };
361 
362 /* server side implementation */
363 
364 #define IO_U_BUFF_OFF_SERVER(i) (i * IO_U_BUF_LEN)
365 
366 struct server_data {
367 	/* aligned td->orig_buffer */
368 	char *orig_buffer_aligned;
369 
370 	/* resources for messaging buffer from DRAM allocated by fio */
371 	struct rpma_mr_local *msg_mr;
372 
373 	uint32_t msg_sqe_available; /* # of free SQ slots */
374 
375 	/* in-memory queues */
376 	struct rpma_completion *msgs_queued;
377 	uint32_t msg_queued_nr;
378 };
379 
server_init(struct thread_data * td)380 static int server_init(struct thread_data *td)
381 {
382 	struct librpma_fio_server_data *csd;
383 	struct server_data *sd;
384 	int ret = -1;
385 
386 	if ((ret = librpma_fio_server_init(td)))
387 		return ret;
388 
389 	csd = td->io_ops_data;
390 
391 	/* allocate server's data */
392 	sd = calloc(1, sizeof(*sd));
393 	if (sd == NULL) {
394 		td_verror(td, errno, "calloc");
395 		goto err_server_cleanup;
396 	}
397 
398 	/* allocate in-memory queue */
399 	sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
400 	if (sd->msgs_queued == NULL) {
401 		td_verror(td, errno, "calloc");
402 		goto err_free_sd;
403 	}
404 
405 	/*
406 	 * Assure a single io_u buffer can store both SEND and RECV messages and
407 	 * an io_us buffer allocation is page-size-aligned which is required
408 	 * to register for RDMA. User-provided values are intentionally ignored.
409 	 */
410 	td->o.max_bs[DDIR_READ] = IO_U_BUF_LEN;
411 	td->o.mem_align = page_size;
412 
413 	csd->server_data = sd;
414 
415 	return 0;
416 
417 err_free_sd:
418 	free(sd);
419 
420 err_server_cleanup:
421 	librpma_fio_server_cleanup(td);
422 
423 	return -1;
424 }
425 
server_post_init(struct thread_data * td)426 static int server_post_init(struct thread_data *td)
427 {
428 	struct librpma_fio_server_data *csd = td->io_ops_data;
429 	struct server_data *sd = csd->server_data;
430 	size_t io_us_size;
431 	size_t io_u_buflen;
432 	int ret;
433 
434 	/*
435 	 * td->orig_buffer is not aligned. The engine requires aligned io_us
436 	 * so FIO alignes up the address using the formula below.
437 	 */
438 	sd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
439 			td->o.mem_align;
440 
441 	/*
442 	 * XXX
443 	 * Each io_u message buffer contains recv and send messages.
444 	 * Aligning each of those buffers may potentially give
445 	 * some performance benefits.
446 	 */
447 	io_u_buflen = td_max_bs(td);
448 
449 	/* check whether io_u buffer is big enough */
450 	if (io_u_buflen < IO_U_BUF_LEN) {
451 		log_err(
452 			"blocksize too small to accommodate assumed maximal request/response pair size (%" PRIu64 " < %d)\n",
453 			io_u_buflen, IO_U_BUF_LEN);
454 		return -1;
455 	}
456 
457 	/*
458 	 * td->orig_buffer_size beside the space really consumed by io_us
459 	 * has paddings which can be omitted for the memory registration.
460 	 */
461 	io_us_size = (unsigned long long)io_u_buflen *
462 			(unsigned long long)td->o.iodepth;
463 
464 	if ((ret = rpma_mr_reg(csd->peer, sd->orig_buffer_aligned, io_us_size,
465 			RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
466 			&sd->msg_mr))) {
467 		librpma_td_verror(td, ret, "rpma_mr_reg");
468 		return -1;
469 	}
470 
471 	return 0;
472 }
473 
server_cleanup(struct thread_data * td)474 static void server_cleanup(struct thread_data *td)
475 {
476 	struct librpma_fio_server_data *csd = td->io_ops_data;
477 	struct server_data *sd;
478 	int ret;
479 
480 	if (csd == NULL)
481 		return;
482 
483 	sd = csd->server_data;
484 
485 	if (sd != NULL) {
486 		/* rpma_mr_dereg(messaging buffer from DRAM) */
487 		if ((ret = rpma_mr_dereg(&sd->msg_mr)))
488 			librpma_td_verror(td, ret, "rpma_mr_dereg");
489 
490 		free(sd->msgs_queued);
491 		free(sd);
492 	}
493 
494 	librpma_fio_server_cleanup(td);
495 }
496 
prepare_connection(struct thread_data * td,struct rpma_conn_req * conn_req)497 static int prepare_connection(struct thread_data *td,
498 		struct rpma_conn_req *conn_req)
499 {
500 	struct librpma_fio_server_data *csd = td->io_ops_data;
501 	struct server_data *sd = csd->server_data;
502 	int ret;
503 	int i;
504 
505 	/* prepare buffers for a flush requests */
506 	sd->msg_sqe_available = td->o.iodepth;
507 	for (i = 0; i < td->o.iodepth; i++) {
508 		size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER(i) + RECV_OFFSET;
509 		if ((ret = rpma_conn_req_recv(conn_req, sd->msg_mr,
510 				offset_recv_msg, MAX_MSG_SIZE,
511 				(const void *)(uintptr_t)i))) {
512 			librpma_td_verror(td, ret, "rpma_conn_req_recv");
513 			return ret;
514 		}
515 	}
516 
517 	return 0;
518 }
519 
server_open_file(struct thread_data * td,struct fio_file * f)520 static int server_open_file(struct thread_data *td, struct fio_file *f)
521 {
522 	struct librpma_fio_server_data *csd = td->io_ops_data;
523 	struct rpma_conn_cfg *cfg = NULL;
524 	uint16_t max_msg_num = td->o.iodepth;
525 	int ret;
526 
527 	csd->prepare_connection = prepare_connection;
528 
529 	/* create a connection configuration object */
530 	if ((ret = rpma_conn_cfg_new(&cfg))) {
531 		librpma_td_verror(td, ret, "rpma_conn_cfg_new");
532 		return -1;
533 	}
534 
535 	/*
536 	 * Calculate the required queue sizes where:
537 	 * - the send queue (SQ) has to be big enough to accommodate
538 	 *   all possible flush requests (SENDs)
539 	 * - the receive queue (RQ) has to be big enough to accommodate
540 	 *   all flush responses (RECVs)
541 	 * - the completion queue (CQ) has to be big enough to accommodate
542 	 *   all success and error completions (sq_size + rq_size)
543 	 */
544 	if ((ret = rpma_conn_cfg_set_sq_size(cfg, max_msg_num))) {
545 		librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
546 		goto err_cfg_delete;
547 	}
548 	if ((ret = rpma_conn_cfg_set_rq_size(cfg, max_msg_num))) {
549 		librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
550 		goto err_cfg_delete;
551 	}
552 	if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
553 		librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
554 		goto err_cfg_delete;
555 	}
556 
557 	ret = librpma_fio_server_open_file(td, f, cfg);
558 
559 err_cfg_delete:
560 	(void) rpma_conn_cfg_delete(&cfg);
561 
562 	return ret;
563 }
564 
server_qe_process(struct thread_data * td,struct rpma_completion * cmpl)565 static int server_qe_process(struct thread_data *td,
566 		struct rpma_completion *cmpl)
567 {
568 	struct librpma_fio_server_data *csd = td->io_ops_data;
569 	struct server_data *sd = csd->server_data;
570 	GPSPMFlushRequest *flush_req;
571 	GPSPMFlushResponse flush_resp = GPSPM_FLUSH_RESPONSE__INIT;
572 	size_t flush_resp_size = 0;
573 	size_t send_buff_offset;
574 	size_t recv_buff_offset;
575 	size_t io_u_buff_offset;
576 	void *send_buff_ptr;
577 	void *recv_buff_ptr;
578 	void *op_ptr;
579 	int msg_index;
580 	int ret;
581 
582 	/* calculate SEND/RECV pair parameters */
583 	msg_index = (int)(uintptr_t)cmpl->op_context;
584 	io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
585 	send_buff_offset = io_u_buff_offset + SEND_OFFSET;
586 	recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
587 	send_buff_ptr = sd->orig_buffer_aligned + send_buff_offset;
588 	recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
589 
590 	/* unpack a flush request from the received buffer */
591 	flush_req = gpspm_flush_request__unpack(NULL, cmpl->byte_len,
592 			recv_buff_ptr);
593 	if (flush_req == NULL) {
594 		log_err("cannot unpack the flush request buffer\n");
595 		goto err_terminate;
596 	}
597 
598 	if (IS_NOT_THE_LAST_MESSAGE(flush_req)) {
599 		op_ptr = csd->ws_ptr + flush_req->offset;
600 		pmem_persist(op_ptr, flush_req->length);
601 	} else {
602 		/*
603 		 * This is the last message - the client is done.
604 		 */
605 		gpspm_flush_request__free_unpacked(flush_req, NULL);
606 		td->done = true;
607 		return 0;
608 	}
609 
610 	/* initiate the next receive operation */
611 	if ((ret = rpma_recv(csd->conn, sd->msg_mr, recv_buff_offset,
612 			MAX_MSG_SIZE,
613 			(const void *)(uintptr_t)msg_index))) {
614 		librpma_td_verror(td, ret, "rpma_recv");
615 		goto err_free_unpacked;
616 	}
617 
618 	/* prepare a flush response and pack it to a send buffer */
619 	flush_resp.op_context = flush_req->op_context;
620 	flush_resp_size = gpspm_flush_response__get_packed_size(&flush_resp);
621 	if (flush_resp_size > MAX_MSG_SIZE) {
622 		log_err(
623 			"Size of the packed flush response is bigger than the available space of the send buffer (%"
624 			PRIu64 " > %i\n", flush_resp_size, MAX_MSG_SIZE);
625 		goto err_free_unpacked;
626 	}
627 
628 	(void) gpspm_flush_response__pack(&flush_resp, send_buff_ptr);
629 
630 	/* send the flush response */
631 	if ((ret = rpma_send(csd->conn, sd->msg_mr, send_buff_offset,
632 			flush_resp_size, RPMA_F_COMPLETION_ALWAYS, NULL))) {
633 		librpma_td_verror(td, ret, "rpma_send");
634 		goto err_free_unpacked;
635 	}
636 	--sd->msg_sqe_available;
637 
638 	gpspm_flush_request__free_unpacked(flush_req, NULL);
639 
640 	return 0;
641 
642 err_free_unpacked:
643 	gpspm_flush_request__free_unpacked(flush_req, NULL);
644 
645 err_terminate:
646 	td->terminate = true;
647 
648 	return -1;
649 }
650 
server_queue_process(struct thread_data * td)651 static inline int server_queue_process(struct thread_data *td)
652 {
653 	struct librpma_fio_server_data *csd = td->io_ops_data;
654 	struct server_data *sd = csd->server_data;
655 	int ret;
656 	int i;
657 
658 	/* min(# of queue entries, # of SQ entries available) */
659 	uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
660 	if (qes_to_process == 0)
661 		return 0;
662 
663 	/* process queued completions */
664 	for (i = 0; i < qes_to_process; ++i) {
665 		if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
666 			return ret;
667 	}
668 
669 	/* progress the queue */
670 	for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
671 		memcpy(&sd->msgs_queued[i],
672 			&sd->msgs_queued[qes_to_process + i],
673 			sizeof(sd->msgs_queued[i]));
674 	}
675 
676 	sd->msg_queued_nr -= qes_to_process;
677 
678 	return 0;
679 }
680 
server_cmpl_process(struct thread_data * td)681 static int server_cmpl_process(struct thread_data *td)
682 {
683 	struct librpma_fio_server_data *csd = td->io_ops_data;
684 	struct server_data *sd = csd->server_data;
685 	struct rpma_completion *cmpl = &sd->msgs_queued[sd->msg_queued_nr];
686 	struct librpma_fio_options_values *o = td->eo;
687 	int ret;
688 
689 	ret = rpma_conn_completion_get(csd->conn, cmpl);
690 	if (ret == RPMA_E_NO_COMPLETION) {
691 		if (o->busy_wait_polling == 0) {
692 			ret = rpma_conn_completion_wait(csd->conn);
693 			if (ret == RPMA_E_NO_COMPLETION) {
694 				/* lack of completion is not an error */
695 				return 0;
696 			} else if (ret != 0) {
697 				librpma_td_verror(td, ret, "rpma_conn_completion_wait");
698 				goto err_terminate;
699 			}
700 
701 			ret = rpma_conn_completion_get(csd->conn, cmpl);
702 			if (ret == RPMA_E_NO_COMPLETION) {
703 				/* lack of completion is not an error */
704 				return 0;
705 			} else if (ret != 0) {
706 				librpma_td_verror(td, ret, "rpma_conn_completion_get");
707 				goto err_terminate;
708 			}
709 		} else {
710 			/* lack of completion is not an error */
711 			return 0;
712 		}
713 	} else if (ret != 0) {
714 		librpma_td_verror(td, ret, "rpma_conn_completion_get");
715 		goto err_terminate;
716 	}
717 
718 	/* validate the completion */
719 	if (cmpl->op_status != IBV_WC_SUCCESS)
720 		goto err_terminate;
721 
722 	if (cmpl->op == RPMA_OP_RECV)
723 		++sd->msg_queued_nr;
724 	else if (cmpl->op == RPMA_OP_SEND)
725 		++sd->msg_sqe_available;
726 
727 	return 0;
728 
729 err_terminate:
730 	td->terminate = true;
731 
732 	return -1;
733 }
734 
server_queue(struct thread_data * td,struct io_u * io_u)735 static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
736 {
737 	do {
738 		if (server_cmpl_process(td))
739 			return FIO_Q_BUSY;
740 
741 		if (server_queue_process(td))
742 			return FIO_Q_BUSY;
743 
744 	} while (!td->done);
745 
746 	return FIO_Q_COMPLETED;
747 }
748 
749 FIO_STATIC struct ioengine_ops ioengine_server = {
750 	.name			= "librpma_gpspm_server",
751 	.version		= FIO_IOOPS_VERSION,
752 	.init			= server_init,
753 	.post_init		= server_post_init,
754 	.open_file		= server_open_file,
755 	.close_file		= librpma_fio_server_close_file,
756 	.queue			= server_queue,
757 	.invalidate		= librpma_fio_file_nop,
758 	.cleanup		= server_cleanup,
759 	.flags			= FIO_SYNCIO,
760 	.options		= librpma_fio_options,
761 	.option_struct_size	= sizeof(struct librpma_fio_options_values),
762 };
763 
764 /* register both engines */
765 
fio_librpma_gpspm_register(void)766 static void fio_init fio_librpma_gpspm_register(void)
767 {
768 	register_ioengine(&ioengine_client);
769 	register_ioengine(&ioengine_server);
770 }
771 
fio_librpma_gpspm_unregister(void)772 static void fio_exit fio_librpma_gpspm_unregister(void)
773 {
774 	unregister_ioengine(&ioengine_client);
775 	unregister_ioengine(&ioengine_server);
776 }
777