1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
3 * Copyright (C) The University of Tennessee and The University
4 * of Tennessee Research Foundation. 2015. ALL RIGHTS RESERVED.
5 * Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
6 *
7 * See file LICENSE for terms.
8 */
9
10
11 #ifdef HAVE_CONFIG_H
12 # include "config.h"
13 #endif
14
15 #include "api/libperf.h"
16 #include "lib/libperf_int.h"
17
18 #include <ucs/sys/string.h>
19 #include <ucs/sys/sys.h>
20 #include <ucs/sys/sock.h>
21 #include <ucs/debug/log.h>
22
23 #include <sys/socket.h>
24 #include <arpa/inet.h>
25 #include <stdlib.h>
26 #include <stdio.h>
27 #include <unistd.h>
28 #include <netdb.h>
29 #include <getopt.h>
30 #include <string.h>
31 #include <sys/types.h>
32 #include <sys/poll.h>
33 #include <locale.h>
34 #if defined (HAVE_MPI)
35 # include <mpi.h>
36 #elif defined (HAVE_RTE)
37 # include<rte.h>
38 #endif
39
40 #define MAX_BATCH_FILES 32
41 #define MAX_CPUS 1024
42 #define TL_RESOURCE_NAME_NONE "<none>"
43 #define TEST_PARAMS_ARGS "t:n:s:W:O:w:D:i:H:oSCqM:r:T:d:x:A:BUm:"
44 #define TEST_ID_UNDEFINED -1
45
46 enum {
47 TEST_FLAG_PRINT_RESULTS = UCS_BIT(0),
48 TEST_FLAG_PRINT_TEST = UCS_BIT(1),
49 TEST_FLAG_SET_AFFINITY = UCS_BIT(8),
50 TEST_FLAG_NUMERIC_FMT = UCS_BIT(9),
51 TEST_FLAG_PRINT_FINAL = UCS_BIT(10),
52 TEST_FLAG_PRINT_CSV = UCS_BIT(11)
53 };
54
55 typedef struct sock_rte_group {
56 int is_server;
57 int connfd;
58 } sock_rte_group_t;
59
60
61 typedef struct test_type {
62 const char *name;
63 ucx_perf_api_t api;
64 ucx_perf_cmd_t command;
65 ucx_perf_test_type_t test_type;
66 const char *desc;
67 const char *overhead_lat;
68 unsigned window_size;
69 } test_type_t;
70
71
72 typedef struct perftest_params {
73 ucx_perf_params_t super;
74 int test_id;
75 } perftest_params_t;
76
77
78 struct perftest_context {
79 perftest_params_t params;
80 const char *server_addr;
81 int port;
82 int mpi;
83 unsigned num_cpus;
84 unsigned cpus[MAX_CPUS];
85 unsigned flags;
86
87 unsigned num_batch_files;
88 char *batch_files[MAX_BATCH_FILES];
89 char *test_names[MAX_BATCH_FILES];
90
91 sock_rte_group_t sock_rte_group;
92 };
93
94
95 test_type_t tests[] = {
96 {"am_lat", UCX_PERF_API_UCT, UCX_PERF_CMD_AM, UCX_PERF_TEST_TYPE_PINGPONG,
97 "active message latency", "latency", 1},
98
99 {"put_lat", UCX_PERF_API_UCT, UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_PINGPONG,
100 "put latency", "latency", 1},
101
102 {"add_lat", UCX_PERF_API_UCT, UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_PINGPONG,
103 "atomic add latency", "latency", 1},
104
105 {"get", UCX_PERF_API_UCT, UCX_PERF_CMD_GET, UCX_PERF_TEST_TYPE_STREAM_UNI,
106 "get latency / bandwidth / message rate", "latency", 1},
107
108 {"fadd", UCX_PERF_API_UCT, UCX_PERF_CMD_FADD, UCX_PERF_TEST_TYPE_STREAM_UNI,
109 "atomic fetch-and-add latency / rate", "latency", 1},
110
111 {"swap", UCX_PERF_API_UCT, UCX_PERF_CMD_SWAP, UCX_PERF_TEST_TYPE_STREAM_UNI,
112 "atomic swap latency / rate", "latency", 1},
113
114 {"cswap", UCX_PERF_API_UCT, UCX_PERF_CMD_CSWAP, UCX_PERF_TEST_TYPE_STREAM_UNI,
115 "atomic compare-and-swap latency / rate", "latency", 1},
116
117 {"am_bw", UCX_PERF_API_UCT, UCX_PERF_CMD_AM, UCX_PERF_TEST_TYPE_STREAM_UNI,
118 "active message bandwidth / message rate", "overhead", 1},
119
120 {"put_bw", UCX_PERF_API_UCT, UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_STREAM_UNI,
121 "put bandwidth / message rate", "overhead", 1},
122
123 {"add_mr", UCX_PERF_API_UCT, UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_STREAM_UNI,
124 "atomic add message rate", "overhead", 1},
125
126 {"tag_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_TAG, UCX_PERF_TEST_TYPE_PINGPONG,
127 "tag match latency", "latency", 1},
128
129 {"tag_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_TAG, UCX_PERF_TEST_TYPE_STREAM_UNI,
130 "tag match bandwidth", "overhead", 32},
131
132 {"tag_sync_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_TAG_SYNC, UCX_PERF_TEST_TYPE_PINGPONG,
133 "tag sync match latency", "latency", 1},
134
135 {"tag_sync_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_TAG_SYNC, UCX_PERF_TEST_TYPE_STREAM_UNI,
136 "tag sync match bandwidth", "overhead", 32},
137
138 {"ucp_put_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_PINGPONG,
139 "put latency", "latency", 1},
140
141 {"ucp_put_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_STREAM_UNI,
142 "put bandwidth", "overhead", 32},
143
144 {"ucp_get", UCX_PERF_API_UCP, UCX_PERF_CMD_GET, UCX_PERF_TEST_TYPE_STREAM_UNI,
145 "get latency / bandwidth / message rate", "latency", 1},
146
147 {"ucp_add", UCX_PERF_API_UCP, UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_STREAM_UNI,
148 "atomic add bandwidth / message rate", "overhead", 1},
149
150 {"ucp_fadd", UCX_PERF_API_UCP, UCX_PERF_CMD_FADD, UCX_PERF_TEST_TYPE_STREAM_UNI,
151 "atomic fetch-and-add latency / bandwidth / rate", "latency", 1},
152
153 {"ucp_swap", UCX_PERF_API_UCP, UCX_PERF_CMD_SWAP, UCX_PERF_TEST_TYPE_STREAM_UNI,
154 "atomic swap latency / bandwidth / rate", "latency", 1},
155
156 {"ucp_cswap", UCX_PERF_API_UCP, UCX_PERF_CMD_CSWAP, UCX_PERF_TEST_TYPE_STREAM_UNI,
157 "atomic compare-and-swap latency / bandwidth / rate", "latency", 1},
158
159 {"stream_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_STREAM, UCX_PERF_TEST_TYPE_STREAM_UNI,
160 "stream bandwidth", "overhead", 1},
161
162 {"stream_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_STREAM, UCX_PERF_TEST_TYPE_PINGPONG,
163 "stream latency", "latency", 1},
164
165 {NULL}
166 };
167
sock_io(int sock,ssize_t (* sock_call)(int,void *,size_t,int),int poll_events,void * data,size_t size,void (* progress)(void * arg),void * arg,const char * name)168 static int sock_io(int sock, ssize_t (*sock_call)(int, void *, size_t, int),
169 int poll_events, void *data, size_t size,
170 void (*progress)(void *arg), void *arg, const char *name)
171 {
172 size_t total = 0;
173 struct pollfd pfd;
174 int ret;
175
176 while (total < size) {
177 pfd.fd = sock;
178 pfd.events = poll_events;
179 pfd.revents = 0;
180
181 ret = poll(&pfd, 1, 1); /* poll for 1ms */
182 if (ret > 0) {
183 ucs_assert(ret == 1);
184 ucs_assert(pfd.revents & poll_events);
185
186 ret = sock_call(sock, (char*)data + total, size - total, 0);
187 if (ret < 0) {
188 ucs_error("%s() failed: %m", name);
189 return -1;
190 }
191 total += ret;
192 } else if ((ret < 0) && (errno != EINTR)) {
193 ucs_error("poll(fd=%d) failed: %m", sock);
194 return -1;
195 }
196
197 /* progress user context */
198 if (progress != NULL) {
199 progress(arg);
200 }
201 }
202 return 0;
203 }
204
safe_send(int sock,void * data,size_t size,void (* progress)(void * arg),void * arg)205 static int safe_send(int sock, void *data, size_t size,
206 void (*progress)(void *arg), void *arg)
207 {
208 typedef ssize_t (*sock_call)(int, void *, size_t, int);
209
210 return sock_io(sock, (sock_call)send, POLLOUT, data, size, progress, arg, "send");
211 }
212
safe_recv(int sock,void * data,size_t size,void (* progress)(void * arg),void * arg)213 static int safe_recv(int sock, void *data, size_t size,
214 void (*progress)(void *arg), void *arg)
215 {
216 return sock_io(sock, recv, POLLIN, data, size, progress, arg, "recv");
217 }
218
print_progress(char ** test_names,unsigned num_names,const ucx_perf_result_t * result,unsigned flags,int final,int is_server,int is_multi_thread)219 static void print_progress(char **test_names, unsigned num_names,
220 const ucx_perf_result_t *result, unsigned flags,
221 int final, int is_server, int is_multi_thread)
222 {
223 static const char *fmt_csv;
224 static const char *fmt_numeric;
225 static const char *fmt_plain;
226 unsigned i;
227
228 if (!(flags & TEST_FLAG_PRINT_RESULTS) ||
229 (!final && (flags & TEST_FLAG_PRINT_FINAL)))
230 {
231 return;
232 }
233
234 if (flags & TEST_FLAG_PRINT_CSV) {
235 for (i = 0; i < num_names; ++i) {
236 printf("%s,", test_names[i]);
237 }
238 }
239
240 #if _OPENMP
241 if (!final) {
242 printf("[thread %d]", omp_get_thread_num());
243 } else if (flags & TEST_FLAG_PRINT_RESULTS) {
244 printf("Final: ");
245 }
246 #endif
247
248 if (is_multi_thread && final) {
249 fmt_csv = "%4.0f,%.3f,%.2f,%.0f\n";
250 fmt_numeric = "%'18.0f %29.3f %22.2f %'24.0f\n";
251 fmt_plain = "%18.0f %29.3f %22.2f %23.0f\n";
252
253 printf((flags & TEST_FLAG_PRINT_CSV) ? fmt_csv :
254 (flags & TEST_FLAG_NUMERIC_FMT) ? fmt_numeric :
255 fmt_plain,
256 (double)result->iters,
257 result->latency.total_average * 1000000.0,
258 result->bandwidth.total_average / (1024.0 * 1024.0),
259 result->msgrate.total_average);
260 } else {
261 fmt_csv = "%4.0f,%.3f,%.3f,%.3f,%.2f,%.2f,%.0f,%.0f\n";
262 fmt_numeric = "%'18.0f %9.3f %9.3f %9.3f %11.2f %10.2f %'11.0f %'11.0f\n";
263 fmt_plain = "%18.0f %9.3f %9.3f %9.3f %11.2f %10.2f %11.0f %11.0f\n";
264
265 printf((flags & TEST_FLAG_PRINT_CSV) ? fmt_csv :
266 (flags & TEST_FLAG_NUMERIC_FMT) ? fmt_numeric :
267 fmt_plain,
268 (double)result->iters,
269 result->latency.typical * 1000000.0,
270 result->latency.moment_average * 1000000.0,
271 result->latency.total_average * 1000000.0,
272 result->bandwidth.moment_average / (1024.0 * 1024.0),
273 result->bandwidth.total_average / (1024.0 * 1024.0),
274 result->msgrate.moment_average,
275 result->msgrate.total_average);
276 }
277
278 fflush(stdout);
279 }
280
print_header(struct perftest_context * ctx)281 static void print_header(struct perftest_context *ctx)
282 {
283 const char *overhead_lat_str;
284 const char *test_data_str;
285 const char *test_api_str;
286 test_type_t *test;
287 unsigned i;
288
289 test = (ctx->params.test_id == TEST_ID_UNDEFINED) ? NULL :
290 &tests[ctx->params.test_id];
291
292 if ((ctx->flags & TEST_FLAG_PRINT_TEST) && (test != NULL)) {
293 if (test->api == UCX_PERF_API_UCT) {
294 test_api_str = "transport layer";
295 switch (ctx->params.super.uct.data_layout) {
296 case UCT_PERF_DATA_LAYOUT_SHORT:
297 test_data_str = "short";
298 break;
299 case UCT_PERF_DATA_LAYOUT_BCOPY:
300 test_data_str = "bcopy";
301 break;
302 case UCT_PERF_DATA_LAYOUT_ZCOPY:
303 test_data_str = "zcopy";
304 break;
305 default:
306 test_data_str = "(undefined)";
307 break;
308 }
309 } else if (test->api == UCX_PERF_API_UCP) {
310 test_api_str = "protocol layer";
311 test_data_str = "(automatic)"; /* TODO contig/stride/stream */
312 } else {
313 return;
314 }
315
316 printf("+------------------------------------------------------------------------------------------+\n");
317 printf("| API: %-60s |\n", test_api_str);
318 printf("| Test: %-60s |\n", test->desc);
319 printf("| Data layout: %-60s |\n", test_data_str);
320 printf("| Send memory: %-60s |\n", ucs_memory_type_names[ctx->params.super.send_mem_type]);
321 printf("| Recv memory: %-60s |\n", ucs_memory_type_names[ctx->params.super.recv_mem_type]);
322 printf("| Message size: %-60zu |\n", ucx_perf_get_message_size(&ctx->params.super));
323 }
324
325 if (ctx->flags & TEST_FLAG_PRINT_CSV) {
326 if (ctx->flags & TEST_FLAG_PRINT_RESULTS) {
327 for (i = 0; i < ctx->num_batch_files; ++i) {
328 printf("%s,", basename(ctx->batch_files[i]));
329 }
330 printf("iterations,typical_lat,avg_lat,overall_lat,avg_bw,overall_bw,avg_mr,overall_mr\n");
331 }
332 } else {
333 if (ctx->flags & TEST_FLAG_PRINT_RESULTS) {
334 overhead_lat_str = (test == NULL) ? "overhead" : test->overhead_lat;
335
336 printf("+--------------+--------------+-----------------------------+---------------------+-----------------------+\n");
337 printf("| | | %8s (usec) | bandwidth (MB/s) | message rate (msg/s) |\n", overhead_lat_str);
338 printf("+--------------+--------------+---------+---------+---------+----------+----------+-----------+-----------+\n");
339 printf("| Stage | # iterations | typical | average | overall | average | overall | average | overall |\n");
340 printf("+--------------+--------------+---------+---------+---------+----------+----------+-----------+-----------+\n");
341 } else if (ctx->flags & TEST_FLAG_PRINT_TEST) {
342 printf("+------------------------------------------------------------------------------------------+\n");
343 }
344 }
345 }
346
print_test_name(struct perftest_context * ctx)347 static void print_test_name(struct perftest_context *ctx)
348 {
349 char buf[200];
350 unsigned i, pos;
351
352 if (!(ctx->flags & TEST_FLAG_PRINT_CSV) && (ctx->num_batch_files > 0)) {
353 strcpy(buf, "+--------------+---------+---------+---------+----------+----------+-----------+-----------+");
354
355 pos = 1;
356 for (i = 0; i < ctx->num_batch_files; ++i) {
357 if (i != 0) {
358 buf[pos++] = '/';
359 }
360 memcpy(&buf[pos], ctx->test_names[i],
361 ucs_min(strlen(ctx->test_names[i]), sizeof(buf) - pos - 1));
362 pos += strlen(ctx->test_names[i]);
363 }
364
365 if (ctx->flags & TEST_FLAG_PRINT_RESULTS) {
366 printf("%s\n", buf);
367 }
368 }
369 }
370
print_memory_type_usage(void)371 static void print_memory_type_usage(void)
372 {
373 ucs_memory_type_t it;
374 for (it = UCS_MEMORY_TYPE_HOST; it < UCS_MEMORY_TYPE_LAST; it++) {
375 if (ucx_perf_mem_type_allocators[it] != NULL) {
376 printf(" %s - %s\n",
377 ucs_memory_type_names[it],
378 ucs_memory_type_descs[it]);
379 }
380 }
381 }
382
usage(const struct perftest_context * ctx,const char * program)383 static void usage(const struct perftest_context *ctx, const char *program)
384 {
385 static const char* api_names[] = {
386 [UCX_PERF_API_UCT] = "UCT",
387 [UCX_PERF_API_UCP] = "UCP"
388 };
389 test_type_t *test;
390 int UCS_V_UNUSED rank;
391
392 #ifdef HAVE_MPI
393 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
394 if (ctx->mpi && (rank != 0)) {
395 return;
396 }
397 #endif
398
399 #if defined (HAVE_MPI)
400 printf(" Note: test can be also launched as an MPI application\n");
401 printf("\n");
402 #elif defined (HAVE_RTE)
403 printf(" Note: this test can be also launched as an libRTE application\n");
404 printf("\n");
405 #endif
406 printf(" Usage: %s [ server-hostname ] [ options ]\n", program);
407 printf("\n");
408 printf(" Common options:\n");
409 printf(" -t <test> test to run:\n");
410 for (test = tests; test->name; ++test) {
411 printf(" %13s - %s %s\n", test->name,
412 api_names[test->api], test->desc);
413 }
414 printf("\n");
415 printf(" -s <size> list of scatter-gather sizes for single message (%zu)\n",
416 ctx->params.super.msg_size_list[0]);
417 printf(" for example: \"-s 16,48,8192,8192,14\"\n");
418 printf(" -m <send mem type>[,<recv mem type>]\n");
419 printf(" memory type of message for sender and receiver (host)\n");
420 print_memory_type_usage();
421 printf(" -n <iters> number of iterations to run (%ld)\n", ctx->params.super.max_iter);
422 printf(" -w <iters> number of warm-up iterations (%zu)\n",
423 ctx->params.super.warmup_iter);
424 printf(" -c <cpulist> set affinity to this CPU list (separated by comma) (off)\n");
425 printf(" -O <count> maximal number of uncompleted outstanding sends\n");
426 printf(" -i <offset> distance between consecutive scatter-gather entries (%zu)\n",
427 ctx->params.super.iov_stride);
428 printf(" -T <threads> number of threads in the test (%d)\n",
429 ctx->params.super.thread_count);
430 printf(" -B register memory with NONBLOCK flag\n");
431 printf(" -b <file> read and execute tests from a batch file: every line in the\n");
432 printf(" file is a test to run, first word is test name, the rest of\n");
433 printf(" the line is command-line arguments for the test.\n");
434 printf(" -p <port> TCP port to use for data exchange (%d)\n", ctx->port);
435 #ifdef HAVE_MPI
436 printf(" -P <0|1> disable/enable MPI mode (%d)\n", ctx->mpi);
437 #endif
438 printf(" -h show this help message\n");
439 printf("\n");
440 printf(" Output format:\n");
441 printf(" -N use numeric formatting (thousands separator)\n");
442 printf(" -f print only final numbers\n");
443 printf(" -v print CSV-formatted output\n");
444 printf("\n");
445 printf(" UCT only:\n");
446 printf(" -d <device> device to use for testing\n");
447 printf(" -x <tl> transport to use for testing\n");
448 printf(" -D <layout> data layout for sender side:\n");
449 printf(" short - short messages (default, cannot be used for get)\n");
450 printf(" bcopy - copy-out (cannot be used for atomics)\n");
451 printf(" zcopy - zero-copy (cannot be used for atomics)\n");
452 printf(" iov - scatter-gather list (iovec)\n");
453 printf(" -W <count> flow control window size, for active messages (%u)\n",
454 ctx->params.super.uct.fc_window);
455 printf(" -H <size> active message header size (%zu)\n",
456 ctx->params.super.am_hdr_size);
457 printf(" -A <mode> asynchronous progress mode (thread_spinlock)\n");
458 printf(" thread_spinlock - separate progress thread with spin locking\n");
459 printf(" thread_mutex - separate progress thread with mutex locking\n");
460 printf(" signal - signal-based timer\n");
461 printf("\n");
462 printf(" UCP only:\n");
463 printf(" -M <thread> thread support level for progress engine (single)\n");
464 printf(" single - only the master thread can access\n");
465 printf(" serialized - one thread can access at a time\n");
466 printf(" multi - multiple threads can access\n");
467 printf(" -D <layout>[,<layout>]\n");
468 printf(" data layout for sender and receiver side (contig)\n");
469 printf(" contig - Continuous datatype\n");
470 printf(" iov - Scatter-gather list\n");
471 printf(" -C use wild-card tag for tag tests\n");
472 printf(" -U force unexpected flow by using tag probe\n");
473 printf(" -r <mode> receive mode for stream tests (recv)\n");
474 printf(" recv : Use ucp_stream_recv_nb\n");
475 printf(" recv_data : Use ucp_stream_recv_data_nb\n");
476 printf("\n");
477 printf(" NOTE: When running UCP tests, transport and device should be specified by\n");
478 printf(" environment variables: UCX_TLS and UCX_[SELF|SHM|NET]_DEVICES.\n");
479 printf("\n");
480 }
481
parse_ucp_datatype_params(const char * opt_arg,ucp_perf_datatype_t * datatype)482 static ucs_status_t parse_ucp_datatype_params(const char *opt_arg,
483 ucp_perf_datatype_t *datatype)
484 {
485 const char *iov_type = "iov";
486 const size_t iov_type_size = strlen("iov");
487 const char *contig_type = "contig";
488 const size_t contig_type_size = strlen("contig");
489
490 if (0 == strncmp(opt_arg, iov_type, iov_type_size)) {
491 *datatype = UCP_PERF_DATATYPE_IOV;
492 } else if (0 == strncmp(opt_arg, contig_type, contig_type_size)) {
493 *datatype = UCP_PERF_DATATYPE_CONTIG;
494 } else {
495 return UCS_ERR_INVALID_PARAM;
496 }
497
498 return UCS_OK;
499 }
500
parse_mem_type(const char * opt_arg,ucs_memory_type_t * mem_type)501 static ucs_status_t parse_mem_type(const char *opt_arg,
502 ucs_memory_type_t *mem_type)
503 {
504 ucs_memory_type_t it;
505 for (it = UCS_MEMORY_TYPE_HOST; it < UCS_MEMORY_TYPE_LAST; it++) {
506 if(!strcmp(opt_arg, ucs_memory_type_names[it]) &&
507 (ucx_perf_mem_type_allocators[it] != NULL)) {
508 *mem_type = it;
509 return UCS_OK;
510 }
511 }
512 ucs_error("Unsupported memory type: \"%s\"", opt_arg);
513 return UCS_ERR_INVALID_PARAM;
514 }
515
parse_mem_type_params(const char * opt_arg,ucs_memory_type_t * send_mem_type,ucs_memory_type_t * recv_mem_type)516 static ucs_status_t parse_mem_type_params(const char *opt_arg,
517 ucs_memory_type_t *send_mem_type,
518 ucs_memory_type_t *recv_mem_type)
519 {
520 const char *delim = ",";
521 char *token = strtok((char*)opt_arg, delim);
522
523 if (UCS_OK != parse_mem_type(token, send_mem_type)) {
524 return UCS_ERR_INVALID_PARAM;
525 }
526
527 token = strtok(NULL, delim);
528 if (NULL == token) {
529 *recv_mem_type = *send_mem_type;
530 return UCS_OK;
531 } else {
532 return parse_mem_type(token, recv_mem_type);
533 }
534 }
535
parse_message_sizes_params(const char * opt_arg,ucx_perf_params_t * params)536 static ucs_status_t parse_message_sizes_params(const char *opt_arg,
537 ucx_perf_params_t *params)
538 {
539 const char delim = ',';
540 size_t *msg_size_list, token_num, token_it;
541 char *optarg_ptr, *optarg_ptr2;
542
543 optarg_ptr = (char *)opt_arg;
544 token_num = 0;
545 /* count the number of given message sizes */
546 while ((optarg_ptr = strchr(optarg_ptr, delim)) != NULL) {
547 ++optarg_ptr;
548 ++token_num;
549 }
550 ++token_num;
551
552 msg_size_list = realloc(params->msg_size_list,
553 sizeof(*params->msg_size_list) * token_num);
554 if (NULL == msg_size_list) {
555 return UCS_ERR_NO_MEMORY;
556 }
557
558 params->msg_size_list = msg_size_list;
559
560 optarg_ptr = (char *)opt_arg;
561 errno = 0;
562 for (token_it = 0; token_it < token_num; ++token_it) {
563 params->msg_size_list[token_it] = strtoul(optarg_ptr, &optarg_ptr2, 10);
564 if (((ERANGE == errno) && (ULONG_MAX == params->msg_size_list[token_it])) ||
565 ((errno != 0) && (params->msg_size_list[token_it] == 0)) ||
566 (optarg_ptr == optarg_ptr2)) {
567 free(params->msg_size_list);
568 params->msg_size_list = NULL; /* prevent double free */
569 ucs_error("Invalid option substring argument at position %lu", token_it);
570 return UCS_ERR_INVALID_PARAM;
571 }
572 optarg_ptr = optarg_ptr2 + 1;
573 }
574
575 params->msg_size_cnt = token_num;
576 return UCS_OK;
577 }
578
init_test_params(perftest_params_t * params)579 static ucs_status_t init_test_params(perftest_params_t *params)
580 {
581 memset(params, 0, sizeof(*params));
582 params->super.api = UCX_PERF_API_LAST;
583 params->super.command = UCX_PERF_CMD_LAST;
584 params->super.test_type = UCX_PERF_TEST_TYPE_LAST;
585 params->super.thread_mode = UCS_THREAD_MODE_SINGLE;
586 params->super.thread_count = 1;
587 params->super.async_mode = UCS_ASYNC_THREAD_LOCK_TYPE;
588 params->super.wait_mode = UCX_PERF_WAIT_MODE_LAST;
589 params->super.max_outstanding = 0;
590 params->super.warmup_iter = 10000;
591 params->super.am_hdr_size = 8;
592 params->super.alignment = ucs_get_page_size();
593 params->super.max_iter = 1000000l;
594 params->super.max_time = 0.0;
595 params->super.report_interval = 1.0;
596 params->super.flags = UCX_PERF_TEST_FLAG_VERBOSE;
597 params->super.uct.fc_window = UCT_PERF_TEST_MAX_FC_WINDOW;
598 params->super.uct.data_layout = UCT_PERF_DATA_LAYOUT_SHORT;
599 params->super.send_mem_type = UCS_MEMORY_TYPE_HOST;
600 params->super.recv_mem_type = UCS_MEMORY_TYPE_HOST;
601 params->super.msg_size_cnt = 1;
602 params->super.iov_stride = 0;
603 params->super.ucp.send_datatype = UCP_PERF_DATATYPE_CONTIG;
604 params->super.ucp.recv_datatype = UCP_PERF_DATATYPE_CONTIG;
605 strcpy(params->super.uct.dev_name, TL_RESOURCE_NAME_NONE);
606 strcpy(params->super.uct.tl_name, TL_RESOURCE_NAME_NONE);
607
608 params->super.msg_size_list = calloc(params->super.msg_size_cnt,
609 sizeof(*params->super.msg_size_list));
610 if (params->super.msg_size_list == NULL) {
611 return UCS_ERR_NO_MEMORY;
612 }
613
614 params->super.msg_size_list[0] = 8;
615 params->test_id = TEST_ID_UNDEFINED;
616
617 return UCS_OK;
618 }
619
parse_test_params(perftest_params_t * params,char opt,const char * opt_arg)620 static ucs_status_t parse_test_params(perftest_params_t *params, char opt,
621 const char *opt_arg)
622 {
623 char *optarg2 = NULL;
624 test_type_t *test;
625 unsigned i;
626
627 switch (opt) {
628 case 'd':
629 ucs_snprintf_zero(params->super.uct.dev_name,
630 sizeof(params->super.uct.dev_name), "%s", opt_arg);
631 return UCS_OK;
632 case 'x':
633 ucs_snprintf_zero(params->super.uct.tl_name,
634 sizeof(params->super.uct.tl_name), "%s", opt_arg);
635 return UCS_OK;
636 case 't':
637 for (i = 0; tests[i].name != NULL; ++i) {
638 test = &tests[i];
639 if (!strcmp(opt_arg, test->name)) {
640 params->super.api = test->api;
641 params->super.command = test->command;
642 params->super.test_type = test->test_type;
643 params->test_id = i;
644 break;
645 }
646 }
647 if (params->test_id == TEST_ID_UNDEFINED) {
648 ucs_error("Invalid option argument for -t");
649 return UCS_ERR_INVALID_PARAM;
650 }
651 return UCS_OK;
652 case 'D':
653 if (!strcmp(opt_arg, "short")) {
654 params->super.uct.data_layout = UCT_PERF_DATA_LAYOUT_SHORT;
655 } else if (!strcmp(opt_arg, "bcopy")) {
656 params->super.uct.data_layout = UCT_PERF_DATA_LAYOUT_BCOPY;
657 } else if (!strcmp(opt_arg, "zcopy")) {
658 params->super.uct.data_layout = UCT_PERF_DATA_LAYOUT_ZCOPY;
659 } else if (UCS_OK == parse_ucp_datatype_params(opt_arg,
660 ¶ms->super.ucp.send_datatype)) {
661 optarg2 = strchr(opt_arg, ',');
662 if (optarg2) {
663 if (UCS_OK != parse_ucp_datatype_params(optarg2 + 1,
664 ¶ms->super.ucp.recv_datatype)) {
665 return UCS_ERR_INVALID_PARAM;
666 }
667 }
668 } else {
669 ucs_error("Invalid option argument for -D");
670 return UCS_ERR_INVALID_PARAM;
671 }
672 return UCS_OK;
673 case 'i':
674 params->super.iov_stride = atol(opt_arg);
675 return UCS_OK;
676 case 'n':
677 params->super.max_iter = atol(opt_arg);
678 return UCS_OK;
679 case 's':
680 return parse_message_sizes_params(opt_arg, ¶ms->super);
681 case 'H':
682 params->super.am_hdr_size = atol(opt_arg);
683 return UCS_OK;
684 case 'W':
685 params->super.uct.fc_window = atoi(opt_arg);
686 return UCS_OK;
687 case 'O':
688 params->super.max_outstanding = atoi(opt_arg);
689 return UCS_OK;
690 case 'w':
691 params->super.warmup_iter = atol(opt_arg);
692 return UCS_OK;
693 case 'o':
694 params->super.flags |= UCX_PERF_TEST_FLAG_ONE_SIDED;
695 return UCS_OK;
696 case 'B':
697 params->super.flags |= UCX_PERF_TEST_FLAG_MAP_NONBLOCK;
698 return UCS_OK;
699 case 'q':
700 params->super.flags &= ~UCX_PERF_TEST_FLAG_VERBOSE;
701 return UCS_OK;
702 case 'C':
703 params->super.flags |= UCX_PERF_TEST_FLAG_TAG_WILDCARD;
704 return UCS_OK;
705 case 'U':
706 params->super.flags |= UCX_PERF_TEST_FLAG_TAG_UNEXP_PROBE;
707 return UCS_OK;
708 case 'M':
709 if (!strcmp(opt_arg, "single")) {
710 params->super.thread_mode = UCS_THREAD_MODE_SINGLE;
711 return UCS_OK;
712 } else if (!strcmp(opt_arg, "serialized")) {
713 params->super.thread_mode = UCS_THREAD_MODE_SERIALIZED;
714 return UCS_OK;
715 } else if (!strcmp(opt_arg, "multi")) {
716 params->super.thread_mode = UCS_THREAD_MODE_MULTI;
717 return UCS_OK;
718 } else {
719 ucs_error("Invalid option argument for -M");
720 return UCS_ERR_INVALID_PARAM;
721 }
722 case 'T':
723 params->super.thread_count = atoi(opt_arg);
724 return UCS_OK;
725 case 'A':
726 if (!strcmp(opt_arg, "thread") || !strcmp(opt_arg, "thread_spinlock")) {
727 params->super.async_mode = UCS_ASYNC_MODE_THREAD_SPINLOCK;
728 return UCS_OK;
729 } else if (!strcmp(opt_arg, "thread_mutex")) {
730 params->super.async_mode = UCS_ASYNC_MODE_THREAD_MUTEX;
731 return UCS_OK;
732 } else if (!strcmp(opt_arg, "signal")) {
733 params->super.async_mode = UCS_ASYNC_MODE_SIGNAL;
734 return UCS_OK;
735 } else {
736 ucs_error("Invalid option argument for -A");
737 return UCS_ERR_INVALID_PARAM;
738 }
739 case 'r':
740 if (!strcmp(opt_arg, "recv_data")) {
741 params->super.flags |= UCX_PERF_TEST_FLAG_STREAM_RECV_DATA;
742 return UCS_OK;
743 } else if (!strcmp(opt_arg, "recv")) {
744 params->super.flags &= ~UCX_PERF_TEST_FLAG_STREAM_RECV_DATA;
745 return UCS_OK;
746 }
747 return UCS_ERR_INVALID_PARAM;
748 case 'm':
749 if (UCS_OK != parse_mem_type_params(opt_arg,
750 ¶ms->super.send_mem_type,
751 ¶ms->super.recv_mem_type)) {
752 return UCS_ERR_INVALID_PARAM;
753 }
754 return UCS_OK;
755 default:
756 return UCS_ERR_INVALID_PARAM;
757 }
758 }
759
adjust_test_params(perftest_params_t * params,const char * error_prefix)760 static ucs_status_t adjust_test_params(perftest_params_t *params,
761 const char *error_prefix)
762 {
763 test_type_t *test;
764
765 if (params->test_id == TEST_ID_UNDEFINED) {
766 ucs_error("%smissing test name", error_prefix);
767 return UCS_ERR_INVALID_PARAM;
768 }
769
770 test = &tests[params->test_id];
771
772 if (params->super.max_outstanding == 0) {
773 params->super.max_outstanding = test->window_size;
774 }
775
776 return UCS_OK;
777 }
778
read_batch_file(FILE * batch_file,const char * file_name,int * line_num,perftest_params_t * params,char ** test_name_p)779 static ucs_status_t read_batch_file(FILE *batch_file, const char *file_name,
780 int *line_num, perftest_params_t *params,
781 char** test_name_p)
782 {
783 #define MAX_SIZE 256
784 #define MAX_ARG_SIZE 2048
785 ucs_status_t status;
786 char buf[MAX_ARG_SIZE];
787 char error_prefix[MAX_ARG_SIZE];
788 int argc;
789 char *argv[MAX_SIZE + 1];
790 int c;
791 char *p;
792
793 do {
794 if (fgets(buf, sizeof(buf) - 1, batch_file) == NULL) {
795 return UCS_ERR_NO_ELEM;
796 }
797 ++(*line_num);
798
799 argc = 0;
800 p = strtok(buf, " \t\n\r");
801 while (p && (argc < MAX_SIZE)) {
802 argv[argc++] = p;
803 p = strtok(NULL, " \t\n\r");
804 }
805 argv[argc] = NULL;
806 } while ((argc == 0) || (argv[0][0] == '#'));
807
808 ucs_snprintf_safe(error_prefix, sizeof(error_prefix),
809 "in batch file '%s' line %d: ", file_name, *line_num);
810
811 optind = 1;
812 while ((c = getopt (argc, argv, TEST_PARAMS_ARGS)) != -1) {
813 status = parse_test_params(params, c, optarg);
814 if (status != UCS_OK) {
815 ucs_error("%s-%c %s: %s", error_prefix, c, optarg,
816 ucs_status_string(status));
817 return status;
818 }
819 }
820
821 status = adjust_test_params(params, error_prefix);
822 if (status != UCS_OK) {
823 return status;
824 }
825
826 *test_name_p = strdup(argv[0]);
827 return UCS_OK;
828 }
829
parse_cpus(char * opt_arg,struct perftest_context * ctx)830 static ucs_status_t parse_cpus(char *opt_arg, struct perftest_context *ctx)
831 {
832 char *endptr, *cpu_list = opt_arg;
833 int cpu;
834
835 ctx->num_cpus = 0;
836 cpu = strtol(cpu_list, &endptr, 10);
837
838 while (((*endptr == ',') || (*endptr == '\0')) && (ctx->num_cpus < MAX_CPUS)) {
839 if (cpu < 0) {
840 ucs_error("invalid cpu number detected: (%d)", cpu);
841 return UCS_ERR_INVALID_PARAM;
842 }
843
844 ctx->cpus[ctx->num_cpus++] = cpu;
845
846 if (*endptr == '\0') {
847 break;
848 }
849
850 cpu_list = endptr + 1; /* skip the comma */
851 cpu = strtol(cpu_list, &endptr, 10);
852 }
853
854 if (*endptr == ',') {
855 ucs_error("number of listed cpus exceeds the maximum supported value (%d)",
856 MAX_CPUS);
857 return UCS_ERR_INVALID_PARAM;
858 }
859
860 return UCS_OK;
861 }
862
parse_opts(struct perftest_context * ctx,int mpi_initialized,int argc,char ** argv)863 static ucs_status_t parse_opts(struct perftest_context *ctx, int mpi_initialized,
864 int argc, char **argv)
865 {
866 ucs_status_t status;
867 int c;
868
869 ucs_trace_func("");
870
871 ucx_perf_global_init(); /* initialize memory types */
872
873 status = init_test_params(&ctx->params);
874 if (status != UCS_OK) {
875 return status;
876 }
877
878 ctx->server_addr = NULL;
879 ctx->num_batch_files = 0;
880 ctx->port = 13337;
881 ctx->flags = 0;
882 ctx->mpi = mpi_initialized;
883
884 optind = 1;
885 while ((c = getopt (argc, argv, "p:b:Nfvc:P:h" TEST_PARAMS_ARGS)) != -1) {
886 switch (c) {
887 case 'p':
888 ctx->port = atoi(optarg);
889 break;
890 case 'b':
891 if (ctx->num_batch_files < MAX_BATCH_FILES) {
892 ctx->batch_files[ctx->num_batch_files++] = optarg;
893 }
894 break;
895 case 'N':
896 ctx->flags |= TEST_FLAG_NUMERIC_FMT;
897 break;
898 case 'f':
899 ctx->flags |= TEST_FLAG_PRINT_FINAL;
900 break;
901 case 'v':
902 ctx->flags |= TEST_FLAG_PRINT_CSV;
903 break;
904 case 'c':
905 ctx->flags |= TEST_FLAG_SET_AFFINITY;
906 status = parse_cpus(optarg, ctx);
907 if (status != UCS_OK) {
908 return status;
909 }
910 break;
911 case 'P':
912 #ifdef HAVE_MPI
913 ctx->mpi = atoi(optarg) && mpi_initialized;
914 break;
915 #endif
916 case 'h':
917 usage(ctx, ucs_basename(argv[0]));
918 return UCS_ERR_CANCELED;
919 default:
920 status = parse_test_params(&ctx->params, c, optarg);
921 if (status != UCS_OK) {
922 usage(ctx, ucs_basename(argv[0]));
923 return status;
924 }
925 break;
926 }
927 }
928
929 if (optind < argc) {
930 ctx->server_addr = argv[optind];
931 }
932
933 return UCS_OK;
934 }
935
sock_rte_group_size(void * rte_group)936 static unsigned sock_rte_group_size(void *rte_group)
937 {
938 return 2;
939 }
940
sock_rte_group_index(void * rte_group)941 static unsigned sock_rte_group_index(void *rte_group)
942 {
943 sock_rte_group_t *group = rte_group;
944 return group->is_server ? 0 : 1;
945 }
946
sock_rte_barrier(void * rte_group,void (* progress)(void * arg),void * arg)947 static void sock_rte_barrier(void *rte_group, void (*progress)(void *arg),
948 void *arg)
949 {
950 #pragma omp barrier
951
952 #pragma omp master
953 {
954 sock_rte_group_t *group = rte_group;
955 const unsigned magic = 0xdeadbeef;
956 unsigned snc;
957
958 snc = magic;
959 safe_send(group->connfd, &snc, sizeof(unsigned), progress, arg);
960
961 snc = 0;
962 safe_recv(group->connfd, &snc, sizeof(unsigned), progress, arg);
963
964 ucs_assert(snc == magic);
965 }
966 #pragma omp barrier
967 }
968
sock_rte_post_vec(void * rte_group,const struct iovec * iovec,int iovcnt,void ** req)969 static void sock_rte_post_vec(void *rte_group, const struct iovec *iovec,
970 int iovcnt, void **req)
971 {
972 sock_rte_group_t *group = rte_group;
973 size_t size;
974 int i;
975
976 size = 0;
977 for (i = 0; i < iovcnt; ++i) {
978 size += iovec[i].iov_len;
979 }
980
981 safe_send(group->connfd, &size, sizeof(size), NULL, NULL);
982 for (i = 0; i < iovcnt; ++i) {
983 safe_send(group->connfd, iovec[i].iov_base, iovec[i].iov_len, NULL,
984 NULL);
985 }
986 }
987
sock_rte_recv(void * rte_group,unsigned src,void * buffer,size_t max,void * req)988 static void sock_rte_recv(void *rte_group, unsigned src, void *buffer,
989 size_t max, void *req)
990 {
991 sock_rte_group_t *group = rte_group;
992 int group_index;
993 size_t size;
994
995 group_index = sock_rte_group_index(rte_group);
996 if (src == group_index) {
997 return;
998 }
999
1000 ucs_assert_always(src == (1 - group_index));
1001 safe_recv(group->connfd, &size, sizeof(size), NULL, NULL);
1002 ucs_assert_always(size <= max);
1003 safe_recv(group->connfd, buffer, size, NULL, NULL);
1004 }
1005
sock_rte_report(void * rte_group,const ucx_perf_result_t * result,void * arg,int is_final,int is_multi_thread)1006 static void sock_rte_report(void *rte_group, const ucx_perf_result_t *result,
1007 void *arg, int is_final, int is_multi_thread)
1008 {
1009 struct perftest_context *ctx = arg;
1010 print_progress(ctx->test_names, ctx->num_batch_files, result, ctx->flags,
1011 is_final, ctx->server_addr == NULL, is_multi_thread);
1012 }
1013
1014 static ucx_perf_rte_t sock_rte = {
1015 .group_size = sock_rte_group_size,
1016 .group_index = sock_rte_group_index,
1017 .barrier = sock_rte_barrier,
1018 .post_vec = sock_rte_post_vec,
1019 .recv = sock_rte_recv,
1020 .exchange_vec = (ucx_perf_rte_exchange_vec_func_t)ucs_empty_function,
1021 .report = sock_rte_report,
1022 };
1023
setup_sock_rte(struct perftest_context * ctx)1024 static ucs_status_t setup_sock_rte(struct perftest_context *ctx)
1025 {
1026 struct sockaddr_in inaddr;
1027 struct hostent *he;
1028 ucs_status_t status;
1029 int optval = 1;
1030 int sockfd, connfd;
1031 int ret;
1032
1033 sockfd = socket(AF_INET, SOCK_STREAM, 0);
1034 if (sockfd < 0) {
1035 ucs_error("socket() failed: %m");
1036 status = UCS_ERR_IO_ERROR;
1037 goto err;
1038 }
1039
1040 if (ctx->server_addr == NULL) {
1041 optval = 1;
1042 status = ucs_socket_setopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
1043 &optval, sizeof(optval));
1044 if (status != UCS_OK) {
1045 goto err_close_sockfd;
1046 }
1047
1048 inaddr.sin_family = AF_INET;
1049 inaddr.sin_port = htons(ctx->port);
1050 inaddr.sin_addr.s_addr = INADDR_ANY;
1051 memset(inaddr.sin_zero, 0, sizeof(inaddr.sin_zero));
1052 ret = bind(sockfd, (struct sockaddr*)&inaddr, sizeof(inaddr));
1053 if (ret < 0) {
1054 ucs_error("bind() failed: %m");
1055 status = UCS_ERR_INVALID_ADDR;
1056 goto err_close_sockfd;
1057 }
1058
1059 ret = listen(sockfd, 10);
1060 if (ret < 0) {
1061 ucs_error("listen() failed: %m");
1062 status = UCS_ERR_IO_ERROR;
1063 goto err_close_sockfd;
1064 }
1065
1066 printf("Waiting for connection...\n");
1067
1068 /* Accept next connection */
1069 connfd = accept(sockfd, NULL, NULL);
1070 if (connfd < 0) {
1071 ucs_error("accept() failed: %m");
1072 status = UCS_ERR_IO_ERROR;
1073 goto err_close_sockfd;
1074 }
1075
1076 close(sockfd);
1077
1078 ret = safe_recv(connfd, &ctx->params, sizeof(ctx->params), NULL, NULL);
1079 if (ret) {
1080 status = UCS_ERR_IO_ERROR;
1081 goto err_close_connfd;
1082 }
1083
1084 if (ctx->params.super.msg_size_cnt) {
1085 ctx->params.super.msg_size_list =
1086 calloc(ctx->params.super.msg_size_cnt,
1087 sizeof(*ctx->params.super.msg_size_list));
1088 if (NULL == ctx->params.super.msg_size_list) {
1089 status = UCS_ERR_NO_MEMORY;
1090 goto err_close_connfd;
1091 }
1092
1093 ret = safe_recv(connfd, ctx->params.super.msg_size_list,
1094 sizeof(*ctx->params.super.msg_size_list) *
1095 ctx->params.super.msg_size_cnt,
1096 NULL, NULL);
1097 if (ret) {
1098 status = UCS_ERR_IO_ERROR;
1099 goto err_close_connfd;
1100 }
1101 }
1102
1103 ctx->sock_rte_group.connfd = connfd;
1104 ctx->sock_rte_group.is_server = 1;
1105 } else {
1106 he = gethostbyname(ctx->server_addr);
1107 if (he == NULL || he->h_addr_list == NULL) {
1108 ucs_error("host %s not found: %s", ctx->server_addr,
1109 hstrerror(h_errno));
1110 status = UCS_ERR_INVALID_ADDR;
1111 goto err_close_sockfd;
1112 }
1113
1114 inaddr.sin_family = he->h_addrtype;
1115 inaddr.sin_port = htons(ctx->port);
1116 ucs_assert(he->h_length == sizeof(inaddr.sin_addr));
1117 memcpy(&inaddr.sin_addr, he->h_addr_list[0], he->h_length);
1118 memset(inaddr.sin_zero, 0, sizeof(inaddr.sin_zero));
1119
1120 ret = connect(sockfd, (struct sockaddr*)&inaddr, sizeof(inaddr));
1121 if (ret < 0) {
1122 ucs_error("connect() failed: %m");
1123 status = UCS_ERR_UNREACHABLE;
1124 goto err_close_sockfd;
1125 }
1126
1127 safe_send(sockfd, &ctx->params, sizeof(ctx->params), NULL, NULL);
1128 if (ctx->params.super.msg_size_cnt) {
1129 safe_send(sockfd, ctx->params.super.msg_size_list,
1130 sizeof(*ctx->params.super.msg_size_list) *
1131 ctx->params.super.msg_size_cnt,
1132 NULL, NULL);
1133 }
1134
1135 ctx->sock_rte_group.connfd = sockfd;
1136 ctx->sock_rte_group.is_server = 0;
1137 }
1138
1139 if (ctx->sock_rte_group.is_server) {
1140 ctx->flags |= TEST_FLAG_PRINT_TEST;
1141 } else {
1142 ctx->flags |= TEST_FLAG_PRINT_RESULTS;
1143 }
1144
1145 ctx->params.super.rte_group = &ctx->sock_rte_group;
1146 ctx->params.super.rte = &sock_rte;
1147 ctx->params.super.report_arg = ctx;
1148 return UCS_OK;
1149
1150 err_close_connfd:
1151 close(connfd);
1152 goto err;
1153 err_close_sockfd:
1154 close(sockfd);
1155 err:
1156 return status;
1157 }
1158
cleanup_sock_rte(struct perftest_context * ctx)1159 static ucs_status_t cleanup_sock_rte(struct perftest_context *ctx)
1160 {
1161 close(ctx->sock_rte_group.connfd);
1162 return UCS_OK;
1163 }
1164
1165 #if defined (HAVE_MPI)
mpi_rte_group_size(void * rte_group)1166 static unsigned mpi_rte_group_size(void *rte_group)
1167 {
1168 int size;
1169 MPI_Comm_size(MPI_COMM_WORLD, &size);
1170 return size;
1171 }
1172
mpi_rte_group_index(void * rte_group)1173 static unsigned mpi_rte_group_index(void *rte_group)
1174 {
1175 int rank;
1176 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1177 return rank;
1178 }
1179
mpi_rte_barrier(void * rte_group,void (* progress)(void * arg),void * arg)1180 static void mpi_rte_barrier(void *rte_group, void (*progress)(void *arg),
1181 void *arg)
1182 {
1183 int group_size, my_rank, i;
1184 MPI_Request *reqs;
1185 int nreqs = 0;
1186 int dummy;
1187 int flag;
1188
1189 #pragma omp barrier
1190
1191 #pragma omp master
1192 {
1193 /*
1194 * Naive non-blocking barrier implementation over send/recv, to call user
1195 * progress while waiting for completion.
1196 * Not using MPI_Ibarrier to be compatible with MPI-1.
1197 */
1198
1199 MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
1200 MPI_Comm_size(MPI_COMM_WORLD, &group_size);
1201
1202 /* allocate maximal possible number of requests */
1203 reqs = (MPI_Request*)alloca(sizeof(*reqs) * group_size);
1204
1205 if (my_rank == 0) {
1206 /* root gathers "ping" from all other ranks */
1207 for (i = 1; i < group_size; ++i) {
1208 MPI_Irecv(&dummy, 0, MPI_INT,
1209 i /* source */,
1210 1 /* tag */,
1211 MPI_COMM_WORLD,
1212 &reqs[nreqs++]);
1213 }
1214 } else {
1215 /* every non-root rank sends "ping" and waits for "pong" */
1216 MPI_Send(&dummy, 0, MPI_INT,
1217 0 /* dest */,
1218 1 /* tag */,
1219 MPI_COMM_WORLD);
1220 MPI_Irecv(&dummy, 0, MPI_INT,
1221 0 /* source */,
1222 2 /* tag */,
1223 MPI_COMM_WORLD,
1224 &reqs[nreqs++]);
1225 }
1226
1227 /* Waiting for receive requests */
1228 do {
1229 MPI_Testall(nreqs, reqs, &flag, MPI_STATUSES_IGNORE);
1230 progress(arg);
1231 } while (!flag);
1232
1233 if (my_rank == 0) {
1234 /* root sends "pong" to all ranks */
1235 for (i = 1; i < group_size; ++i) {
1236 MPI_Send(&dummy, 0, MPI_INT,
1237 i /* dest */,
1238 2 /* tag */,
1239 MPI_COMM_WORLD);
1240 }
1241 }
1242 }
1243 #pragma omp barrier
1244 }
1245
mpi_rte_post_vec(void * rte_group,const struct iovec * iovec,int iovcnt,void ** req)1246 static void mpi_rte_post_vec(void *rte_group, const struct iovec *iovec,
1247 int iovcnt, void **req)
1248 {
1249 int group_size;
1250 int my_rank;
1251 int dest, i;
1252
1253 MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
1254 MPI_Comm_size(MPI_COMM_WORLD, &group_size);
1255
1256 for (dest = 0; dest < group_size; ++dest) {
1257 if (dest == my_rank) {
1258 continue;
1259 }
1260
1261 for (i = 0; i < iovcnt; ++i) {
1262 MPI_Send(iovec[i].iov_base, iovec[i].iov_len, MPI_BYTE, dest,
1263 i == (iovcnt - 1), /* Send last iov with tag == 1 */
1264 MPI_COMM_WORLD);
1265 }
1266 }
1267
1268 *req = (void*)(uintptr_t)1;
1269 }
1270
mpi_rte_recv(void * rte_group,unsigned src,void * buffer,size_t max,void * req)1271 static void mpi_rte_recv(void *rte_group, unsigned src, void *buffer, size_t max,
1272 void *req)
1273 {
1274 MPI_Status status;
1275 size_t offset;
1276 int my_rank;
1277 int count;
1278
1279 MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
1280 if (src == my_rank) {
1281 return;
1282 }
1283
1284 offset = 0;
1285 do {
1286 ucs_assert_always(offset < max);
1287 MPI_Recv(buffer + offset, max - offset, MPI_BYTE, src, MPI_ANY_TAG,
1288 MPI_COMM_WORLD, &status);
1289 MPI_Get_count(&status, MPI_BYTE, &count);
1290 offset += count;
1291 } while (status.MPI_TAG != 1);
1292 }
1293
mpi_rte_report(void * rte_group,const ucx_perf_result_t * result,void * arg,int is_final,int is_multi_thread)1294 static void mpi_rte_report(void *rte_group, const ucx_perf_result_t *result,
1295 void *arg, int is_final, int is_multi_thread)
1296 {
1297 struct perftest_context *ctx = arg;
1298 print_progress(ctx->test_names, ctx->num_batch_files, result, ctx->flags,
1299 is_final, ctx->server_addr == NULL, is_multi_thread);
1300 }
1301 #elif defined (HAVE_RTE)
ext_rte_group_size(void * rte_group)1302 static unsigned ext_rte_group_size(void *rte_group)
1303 {
1304 rte_group_t group = (rte_group_t)rte_group;
1305 return rte_group_size(group);
1306 }
1307
ext_rte_group_index(void * rte_group)1308 static unsigned ext_rte_group_index(void *rte_group)
1309 {
1310 rte_group_t group = (rte_group_t)rte_group;
1311 return rte_group_rank(group);
1312 }
1313
ext_rte_barrier(void * rte_group,void (* progress)(void * arg),void * arg)1314 static void ext_rte_barrier(void *rte_group, void (*progress)(void *arg),
1315 void *arg)
1316 {
1317 #pragma omp barrier
1318
1319 #pragma omp master
1320 {
1321 rte_group_t group = (rte_group_t)rte_group;
1322 int rc;
1323
1324 rc = rte_barrier(group);
1325 if (RTE_SUCCESS != rc) {
1326 ucs_error("Failed to rte_barrier");
1327 }
1328 }
1329 #pragma omp barrier
1330 }
1331
ext_rte_post_vec(void * rte_group,const struct iovec * iovec,int iovcnt,void ** req)1332 static void ext_rte_post_vec(void *rte_group, const struct iovec* iovec,
1333 int iovcnt, void **req)
1334 {
1335 rte_group_t group = (rte_group_t)rte_group;
1336 rte_srs_session_t session;
1337 rte_iovec_t *r_vec;
1338 int i, rc;
1339
1340 rc = rte_srs_session_create(group, 0, &session);
1341 if (RTE_SUCCESS != rc) {
1342 ucs_error("Failed to rte_srs_session_create");
1343 }
1344
1345 r_vec = calloc(iovcnt, sizeof(rte_iovec_t));
1346 if (r_vec == NULL) {
1347 return;
1348 }
1349 for (i = 0; i < iovcnt; ++i) {
1350 r_vec[i].iov_base = iovec[i].iov_base;
1351 r_vec[i].type = rte_datatype_uint8_t;
1352 r_vec[i].count = iovec[i].iov_len;
1353 }
1354 rc = rte_srs_set_data(session, "KEY_PERF", r_vec, iovcnt);
1355 if (RTE_SUCCESS != rc) {
1356 ucs_error("Failed to rte_srs_set_data");
1357 }
1358 *req = session;
1359 free(r_vec);
1360 }
1361
ext_rte_recv(void * rte_group,unsigned src,void * buffer,size_t max,void * req)1362 static void ext_rte_recv(void *rte_group, unsigned src, void *buffer,
1363 size_t max, void *req)
1364 {
1365 rte_group_t group = (rte_group_t)rte_group;
1366 rte_srs_session_t session = (rte_srs_session_t)req;
1367 void *rte_buffer = NULL;
1368 rte_iovec_t r_vec;
1369 uint32_t offset;
1370 int size;
1371 int rc;
1372
1373 rc = rte_srs_get_data(session, rte_group_index_to_ec(group, src),
1374 "KEY_PERF", &rte_buffer, &size);
1375 if (RTE_SUCCESS != rc) {
1376 ucs_error("Failed to rte_srs_get_data");
1377 return;
1378 }
1379
1380 r_vec.iov_base = buffer;
1381 r_vec.type = rte_datatype_uint8_t;
1382 r_vec.count = max;
1383
1384 offset = 0;
1385 rte_unpack(&r_vec, rte_buffer, &offset);
1386
1387 rc = rte_srs_session_destroy(session);
1388 if (RTE_SUCCESS != rc) {
1389 ucs_error("Failed to rte_srs_session_destroy");
1390 }
1391 free(rte_buffer);
1392 }
1393
ext_rte_exchange_vec(void * rte_group,void * req)1394 static void ext_rte_exchange_vec(void *rte_group, void * req)
1395 {
1396 rte_srs_session_t session = (rte_srs_session_t)req;
1397 int rc;
1398
1399 rc = rte_srs_exchange_data(session);
1400 if (RTE_SUCCESS != rc) {
1401 ucs_error("Failed to rte_srs_exchange_data");
1402 }
1403 }
1404
ext_rte_report(void * rte_group,const ucx_perf_result_t * result,void * arg,int is_final,int is_multi_thread)1405 static void ext_rte_report(void *rte_group, const ucx_perf_result_t *result,
1406 void *arg, int is_final, int is_multi_thread)
1407 {
1408 struct perftest_context *ctx = arg;
1409 print_progress(ctx->test_names, ctx->num_batch_files, result, ctx->flags,
1410 is_final, ctx->server_addr == NULL, is_multi_thread);
1411 }
1412
1413 static ucx_perf_rte_t ext_rte = {
1414 .group_size = ext_rte_group_size,
1415 .group_index = ext_rte_group_index,
1416 .barrier = ext_rte_barrier,
1417 .report = ext_rte_report,
1418 .post_vec = ext_rte_post_vec,
1419 .recv = ext_rte_recv,
1420 .exchange_vec = ext_rte_exchange_vec,
1421 };
1422 #endif
1423
setup_mpi_rte(struct perftest_context * ctx)1424 static ucs_status_t setup_mpi_rte(struct perftest_context *ctx)
1425 {
1426 ucs_trace_func("");
1427
1428 #if defined (HAVE_MPI)
1429 static ucx_perf_rte_t mpi_rte = {
1430 .group_size = mpi_rte_group_size,
1431 .group_index = mpi_rte_group_index,
1432 .barrier = mpi_rte_barrier,
1433 .post_vec = mpi_rte_post_vec,
1434 .recv = mpi_rte_recv,
1435 .exchange_vec = (void*)ucs_empty_function,
1436 .report = mpi_rte_report,
1437 };
1438
1439 int size, rank;
1440
1441 MPI_Comm_size(MPI_COMM_WORLD, &size);
1442 if (size != 2) {
1443 ucs_error("This test should run with exactly 2 processes (actual: %d)", size);
1444 return UCS_ERR_INVALID_PARAM;
1445 }
1446
1447 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1448 if (rank == 1) {
1449 ctx->flags |= TEST_FLAG_PRINT_RESULTS;
1450 }
1451
1452 ctx->params.super.rte_group = NULL;
1453 ctx->params.super.rte = &mpi_rte;
1454 ctx->params.super.report_arg = ctx;
1455 #elif defined (HAVE_RTE)
1456 ctx->params.rte_group = NULL;
1457 ctx->params.rte = &mpi_rte;
1458 ctx->params.report_arg = ctx;
1459 rte_group_t group;
1460
1461 rte_init(NULL, NULL, &group);
1462 if (1 == rte_group_rank(group)) {
1463 ctx->flags |= TEST_FLAG_PRINT_RESULTS;
1464 }
1465
1466 ctx->params.super.rte_group = group;
1467 ctx->params.super.rte = &ext_rte;
1468 ctx->params.super.report_arg = ctx;
1469 #endif
1470 return UCS_OK;
1471 }
1472
cleanup_mpi_rte(struct perftest_context * ctx)1473 static ucs_status_t cleanup_mpi_rte(struct perftest_context *ctx)
1474 {
1475 #ifdef HAVE_RTE
1476 rte_finalize();
1477 #endif
1478 return UCS_OK;
1479 }
1480
check_system(struct perftest_context * ctx)1481 static ucs_status_t check_system(struct perftest_context *ctx)
1482 {
1483 ucs_sys_cpuset_t cpuset;
1484 unsigned i, count, nr_cpus;
1485 int ret;
1486
1487 ucs_trace_func("");
1488
1489 ret = sysconf(_SC_NPROCESSORS_CONF);
1490 if (ret < 0) {
1491 ucs_error("failed to get local cpu count: %m");
1492 return UCS_ERR_INVALID_PARAM;
1493 }
1494 nr_cpus = ret;
1495
1496 memset(&cpuset, 0, sizeof(cpuset));
1497 if (ctx->flags & TEST_FLAG_SET_AFFINITY) {
1498 for (i = 0; i < ctx->num_cpus; i++) {
1499 if (ctx->cpus[i] >= nr_cpus) {
1500 ucs_error("cpu (%u) out of range (0..%u)", ctx->cpus[i], nr_cpus - 1);
1501 return UCS_ERR_INVALID_PARAM;
1502 }
1503 }
1504
1505 for (i = 0; i < ctx->num_cpus; i++) {
1506 CPU_SET(ctx->cpus[i], &cpuset);
1507 }
1508
1509 ret = ucs_sys_setaffinity(&cpuset);
1510 if (ret) {
1511 ucs_warn("sched_setaffinity() failed: %m");
1512 return UCS_ERR_INVALID_PARAM;
1513 }
1514 } else {
1515 ret = ucs_sys_getaffinity(&cpuset);
1516 if (ret) {
1517 ucs_warn("sched_getaffinity() failed: %m");
1518 return UCS_ERR_INVALID_PARAM;
1519 }
1520
1521 count = 0;
1522 for (i = 0; i < CPU_SETSIZE; ++i) {
1523 if (CPU_ISSET(i, &cpuset)) {
1524 ++count;
1525 }
1526 }
1527 if (count > 2) {
1528 ucs_warn("CPU affinity is not set (bound to %u cpus)."
1529 " Performance may be impacted.", count);
1530 }
1531 }
1532
1533 return UCS_OK;
1534 }
1535
clone_params(perftest_params_t * dest,const perftest_params_t * src)1536 static ucs_status_t clone_params(perftest_params_t *dest,
1537 const perftest_params_t *src)
1538 {
1539 size_t msg_size_list_size;
1540
1541 *dest = *src;
1542 msg_size_list_size = dest->super.msg_size_cnt *
1543 sizeof(*dest->super.msg_size_list);
1544 dest->super.msg_size_list = malloc(msg_size_list_size);
1545 if (dest->super.msg_size_list == NULL) {
1546 return ((msg_size_list_size != 0) ? UCS_ERR_NO_MEMORY : UCS_OK);
1547 }
1548
1549 memcpy(dest->super.msg_size_list, src->super.msg_size_list,
1550 msg_size_list_size);
1551 return UCS_OK;
1552 }
1553
run_test_recurs(struct perftest_context * ctx,const perftest_params_t * parent_params,unsigned depth)1554 static ucs_status_t run_test_recurs(struct perftest_context *ctx,
1555 const perftest_params_t *parent_params,
1556 unsigned depth)
1557 {
1558 perftest_params_t params;
1559 ucx_perf_result_t result;
1560 ucs_status_t status;
1561 FILE *batch_file;
1562 int line_num;
1563
1564 ucs_trace_func("depth=%u, num_files=%u", depth, ctx->num_batch_files);
1565
1566 if (parent_params->super.api == UCX_PERF_API_UCP) {
1567 if (strcmp(parent_params->super.uct.dev_name, TL_RESOURCE_NAME_NONE)) {
1568 ucs_warn("-d '%s' ignored for UCP test; see NOTES section in help message",
1569 parent_params->super.uct.dev_name);
1570 }
1571 if (strcmp(parent_params->super.uct.tl_name, TL_RESOURCE_NAME_NONE)) {
1572 ucs_warn("-x '%s' ignored for UCP test; see NOTES section in help message",
1573 parent_params->super.uct.tl_name);
1574 }
1575 }
1576
1577 if (depth >= ctx->num_batch_files) {
1578 print_test_name(ctx);
1579 return ucx_perf_run(&parent_params->super, &result);
1580 }
1581
1582 batch_file = fopen(ctx->batch_files[depth], "r");
1583 if (batch_file == NULL) {
1584 ucs_error("Failed to open batch file '%s': %m", ctx->batch_files[depth]);
1585 return UCS_ERR_IO_ERROR;
1586 }
1587
1588 status = clone_params(¶ms, parent_params);
1589 if (status != UCS_OK) {
1590 goto out;
1591 }
1592
1593 line_num = 0;
1594 while ((status = read_batch_file(batch_file, ctx->batch_files[depth],
1595 &line_num, ¶ms,
1596 &ctx->test_names[depth])) == UCS_OK) {
1597 run_test_recurs(ctx, ¶ms, depth + 1);
1598 free(params.super.msg_size_list);
1599 free(ctx->test_names[depth]);
1600 ctx->test_names[depth] = NULL;
1601
1602 status = clone_params(¶ms, parent_params);
1603 if (status != UCS_OK) {
1604 goto out;
1605 }
1606 }
1607
1608 if (status == UCS_ERR_NO_ELEM) {
1609 status = UCS_OK;
1610 }
1611
1612 free(params.super.msg_size_list);
1613 out:
1614 fclose(batch_file);
1615 return status;
1616 }
1617
run_test(struct perftest_context * ctx)1618 static ucs_status_t run_test(struct perftest_context *ctx)
1619 {
1620 const char *error_prefix;
1621 ucs_status_t status;
1622
1623 ucs_trace_func("");
1624
1625 setlocale(LC_ALL, "en_US");
1626
1627 /* no batch files, only command line params */
1628 if (ctx->num_batch_files == 0) {
1629 error_prefix = (ctx->flags & TEST_FLAG_PRINT_RESULTS) ?
1630 "command line: " : "";
1631 status = adjust_test_params(&ctx->params, error_prefix);
1632 if (status != UCS_OK) {
1633 return status;
1634 }
1635 }
1636
1637 print_header(ctx);
1638
1639 status = run_test_recurs(ctx, &ctx->params, 0);
1640 if (status != UCS_OK) {
1641 ucs_error("Failed to run test: %s", ucs_status_string(status));
1642 }
1643
1644 return status;
1645 }
1646
main(int argc,char ** argv)1647 int main(int argc, char **argv)
1648 {
1649 struct perftest_context ctx;
1650 ucs_status_t status;
1651 int mpi_initialized;
1652 int mpi_rte;
1653 int ret;
1654
1655 #ifdef HAVE_MPI
1656 int provided;
1657
1658 mpi_initialized = !isatty(0) &&
1659 /* Using MPI_THREAD_FUNNELED since ucx_perftest supports
1660 * using multiple threads when only the main one makes
1661 * MPI calls (which is also suitable for a single threaded
1662 * run).
1663 * MPI_THREAD_FUNNELED:
1664 * The process may be multi-threaded, but only the main
1665 * thread will make MPI calls (all MPI calls are funneled
1666 * to the main thread). */
1667 (MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided) == 0);
1668
1669 if (mpi_initialized && (provided != MPI_THREAD_FUNNELED)) {
1670 printf("MPI_Init_thread failed to set MPI_THREAD_FUNNELED. (provided = %d)\n",
1671 provided);
1672 ret = -1;
1673 goto out;
1674 }
1675 #else
1676 mpi_initialized = 0;
1677 #endif
1678
1679 /* Parse command line */
1680 status = parse_opts(&ctx, mpi_initialized, argc, argv);
1681 if (status != UCS_OK) {
1682 ret = (status == UCS_ERR_CANCELED) ? 0 : -127;
1683 goto out_msg_size_list;
1684 }
1685
1686 #ifdef __COVERITY__
1687 /* coverity[dont_call] */
1688 mpi_rte = rand(); /* Shut up deadcode error */
1689 #endif
1690
1691 if (ctx.mpi) {
1692 mpi_rte = 1;
1693 } else {
1694 #ifdef HAVE_RTE
1695 mpi_rte = 1;
1696 #else
1697 mpi_rte = 0;
1698 #endif
1699 }
1700
1701 status = check_system(&ctx);
1702 if (status != UCS_OK) {
1703 ret = -1;
1704 goto out_msg_size_list;
1705 }
1706
1707 /* Create RTE */
1708 status = (mpi_rte) ? setup_mpi_rte(&ctx) : setup_sock_rte(&ctx);
1709 if (status != UCS_OK) {
1710 ret = -1;
1711 goto out_msg_size_list;
1712 }
1713
1714 /* Run the test */
1715 status = run_test(&ctx);
1716 if (status != UCS_OK) {
1717 ret = -1;
1718 goto out_cleanup_rte;
1719 }
1720
1721 ret = 0;
1722
1723 out_cleanup_rte:
1724 (mpi_rte) ? cleanup_mpi_rte(&ctx) : cleanup_sock_rte(&ctx);
1725 out_msg_size_list:
1726 if (ctx.params.super.msg_size_list) {
1727 free(ctx.params.super.msg_size_list);
1728 }
1729 #if HAVE_MPI
1730 out:
1731 #endif
1732 if (mpi_initialized) {
1733 #ifdef HAVE_MPI
1734 MPI_Finalize();
1735 #endif
1736 }
1737 return ret;
1738 }
1739