1 /*
2 * Copyright (C) Mellanox Technologies Ltd. 2020. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #include "ucx_wrapper.h"
8
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #include <sys/time.h>
12 #include <iostream>
13 #include <string.h>
14 #include <getopt.h>
15 #include <assert.h>
16 #include <unistd.h>
17 #include <cstdlib>
18 #include <ctime>
19 #include <vector>
20 #include <map>
21 #include <algorithm>
22 #include <limits>
23
24 #define ALIGNMENT 4096
25
26
27 /* IO operation type */
28 typedef enum {
29 IO_READ,
30 IO_WRITE,
31 IO_COMP
32 } io_op_t;
33
34 static const char *io_op_names[] = {
35 "read",
36 "write",
37 "completion"
38 };
39
40 /* test options */
41 typedef struct {
42 const char *server_addr;
43 int port_num;
44 long client_retries;
45 double client_timeout;
46 double client_runtime_limit;
47 size_t iomsg_size;
48 size_t min_data_size;
49 size_t max_data_size;
50 size_t chunk_size;
51 long iter_count;
52 long window_size;
53 std::vector<io_op_t> operations;
54 unsigned random_seed;
55 size_t num_buffers;
56 bool verbose;
57 } options_t;
58
59 #define LOG UcxLog("[DEMO]", true)
60 #define VERBOSE_LOG UcxLog("[DEMO]", _test_opts.verbose)
61
62 template<class T>
63 class MemoryPool {
64 public:
MemoryPool(size_t buffer_size=0)65 MemoryPool(size_t buffer_size = 0) :
66 _num_allocated(0), _buffer_size(buffer_size) {
67 }
68
~MemoryPool()69 ~MemoryPool() {
70 if (_num_allocated != _freelist.size()) {
71 LOG << "Some items were not freed. Total:" << _num_allocated
72 << ", current:" << _freelist.size() << ".";
73 }
74
75 for (size_t i = 0; i < _freelist.size(); i++) {
76 delete _freelist[i];
77 }
78 }
79
get()80 T * get() {
81 T * item;
82
83 if (_freelist.empty()) {
84 item = new T(_buffer_size, this);
85 _num_allocated++;
86 } else {
87 item = _freelist.back();
88 _freelist.pop_back();
89 }
90 return item;
91 }
92
put(T * item)93 void put(T * item) {
94 _freelist.push_back(item);
95 }
96
97 private:
98 std::vector<T*> _freelist;
99 uint32_t _num_allocated;
100 size_t _buffer_size;
101 };
102
103 /**
104 * Linear congruential generator (LCG):
105 * n[i + 1] = (n[i] * A + C) % M
106 * where A, C, M used as in glibc
107 */
108 class IoDemoRandom {
109 public:
srand(unsigned seed)110 static void srand(unsigned seed) {
111 _seed = seed & _M;
112 }
113
rand(int min=std::numeric_limits<int>::min (),int max=std::numeric_limits<int>::max ())114 static inline int rand(int min = std::numeric_limits<int>::min(),
115 int max = std::numeric_limits<int>::max()) {
116 _seed = (_seed * _A + _C) & _M;
117 /* To resolve that LCG returns alternating even/odd values */
118 if (max - min == 1) {
119 return (_seed & 0x100) ? max : min;
120 } else {
121 return (int)_seed % (max - min + 1) + min;
122 }
123 }
124
125 private:
126 static unsigned _seed;
127 static const unsigned _A;
128 static const unsigned _C;
129 static const unsigned _M;
130 };
131 unsigned IoDemoRandom::_seed = 0;
132 const unsigned IoDemoRandom::_A = 1103515245U;
133 const unsigned IoDemoRandom::_C = 12345U;
134 const unsigned IoDemoRandom::_M = 0x7fffffffU;
135
136 class P2pDemoCommon : public UcxContext {
137 protected:
138
139 /* IO request header */
140 typedef struct {
141 io_op_t op;
142 uint32_t sn;
143 size_t data_size;
144 } iomsg_hdr_t;
145
146 typedef enum {
147 XFER_TYPE_SEND,
148 XFER_TYPE_RECV
149 } xfer_type_t;
150
151 /* Asynchronous IO message */
152 class IoMessage : public UcxCallback {
153 public:
IoMessage(size_t buffer_size,MemoryPool<IoMessage> * pool)154 IoMessage(size_t buffer_size, MemoryPool<IoMessage>* pool) {
155 _buffer = malloc(buffer_size);
156 _pool = pool;
157 _buffer_size = buffer_size;
158 }
159
init(io_op_t op,uint32_t sn,size_t data_size)160 void init(io_op_t op, uint32_t sn, size_t data_size) {
161 iomsg_hdr_t *hdr = reinterpret_cast<iomsg_hdr_t*>(_buffer);
162 assert(sizeof(*hdr) <= _buffer_size);
163 hdr->op = op;
164 hdr->sn = sn;
165 hdr->data_size = data_size;
166 }
167
~IoMessage()168 ~IoMessage() {
169 free(_buffer);
170 }
171
operator ()(ucs_status_t status)172 virtual void operator()(ucs_status_t status) {
173 _pool->put(this);
174 }
175
buffer()176 void *buffer() {
177 return _buffer;
178 }
179
180 private:
181 void* _buffer;
182 MemoryPool<IoMessage>* _pool;
183 size_t _buffer_size;
184 };
185
P2pDemoCommon(const options_t & test_opts)186 P2pDemoCommon(const options_t& test_opts) :
187 UcxContext(test_opts.iomsg_size), _test_opts(test_opts),
188 _io_msg_pool(opts().iomsg_size), _cur_buffer_idx(0), _padding(0) {
189
190 _data_buffers.resize(opts().num_buffers);
191 for (size_t i = 0; i < _data_buffers.size(); ++i) {
192 std::string &data_buffer = _data_buffers[i];
193 data_buffer.resize(opts().max_data_size + ALIGNMENT);
194 uintptr_t ptr = (uintptr_t)&data_buffer[0];
195 _padding = ((ptr + ALIGNMENT - 1) & ~(ALIGNMENT - 1)) - ptr;
196 }
197 }
198
opts() const199 const options_t& opts() const {
200 return _test_opts;
201 }
202
buffer()203 inline void *buffer() {
204 return &_data_buffers[_cur_buffer_idx][_padding];
205 }
206
buffer(size_t offset)207 inline void *buffer(size_t offset) {
208 return &_data_buffers[_cur_buffer_idx][_padding + offset];
209 }
210
next_buffer()211 inline void next_buffer() {
212 _cur_buffer_idx = (_cur_buffer_idx + 1) % _data_buffers.size();
213 assert(_cur_buffer_idx < opts().num_buffers);
214 }
215
get_data_size()216 inline size_t get_data_size() {
217 return IoDemoRandom::rand(opts().min_data_size,
218 opts().max_data_size);
219 }
220
send_io_message(UcxConnection * conn,io_op_t op,uint32_t sn,size_t data_size)221 bool send_io_message(UcxConnection *conn, io_op_t op,
222 uint32_t sn, size_t data_size) {
223 IoMessage *m = _io_msg_pool.get();
224 m->init(op, sn, data_size);
225 VERBOSE_LOG << "sending IO " << io_op_names[op] << ", sn " << sn
226 << " data size " << data_size;
227 return conn->send_io_message(m->buffer(), opts().iomsg_size, m);
228 }
229
send_recv_data_as_chunks(UcxConnection * conn,size_t data_size,uint32_t sn,xfer_type_t send_recv_data,UcxCallback * callback=EmptyCallback::get ())230 void send_recv_data_as_chunks(UcxConnection* conn, size_t data_size, uint32_t sn,
231 xfer_type_t send_recv_data,
232 UcxCallback* callback = EmptyCallback::get()) {
233 size_t remaining = data_size;
234 while (remaining > 0) {
235 size_t xfer_size = std::min(opts().chunk_size, remaining);
236 if (send_recv_data == XFER_TYPE_SEND) {
237 conn->send_data(buffer(data_size - remaining), xfer_size, sn, callback);
238 } else {
239 conn->recv_data(buffer(data_size - remaining), xfer_size, sn, callback);
240 }
241 remaining -= xfer_size;
242 }
243 }
244
send_data_as_chunks(UcxConnection * conn,size_t data_size,uint32_t sn,UcxCallback * callback=EmptyCallback::get ())245 void send_data_as_chunks(UcxConnection* conn, size_t data_size, uint32_t sn,
246 UcxCallback* callback = EmptyCallback::get()) {
247 send_recv_data_as_chunks(conn, data_size, sn, XFER_TYPE_SEND, callback);
248 }
249
recv_data_as_chunks(UcxConnection * conn,size_t data_size,uint32_t sn,UcxCallback * callback=EmptyCallback::get ())250 void recv_data_as_chunks(UcxConnection* conn, size_t data_size, uint32_t sn,
251 UcxCallback* callback = EmptyCallback::get()) {
252 send_recv_data_as_chunks(conn, data_size, sn, XFER_TYPE_RECV, callback);
253 }
254
get_chunk_cnt(size_t data_size)255 uint32_t get_chunk_cnt(size_t data_size) {
256 return (data_size + opts().chunk_size - 1) / opts().chunk_size;
257 }
258
send_data(UcxConnection * conn,size_t data_size,uint32_t sn,UcxCallback * callback=EmptyCallback::get ())259 void send_data(UcxConnection* conn, size_t data_size, uint32_t sn,
260 UcxCallback* callback = EmptyCallback::get()) {
261 send_data_as_chunks(conn, data_size, sn, callback);
262 }
263
recv_data(UcxConnection * conn,size_t data_size,uint32_t sn,UcxCallback * callback=EmptyCallback::get ())264 void recv_data(UcxConnection* conn, size_t data_size, uint32_t sn,
265 UcxCallback* callback = EmptyCallback::get()) {
266 recv_data_as_chunks(conn, data_size, sn, callback);
267 }
268
269 protected:
270 const options_t _test_opts;
271 MemoryPool<IoMessage> _io_msg_pool;
272
273 private:
274 std::vector<std::string> _data_buffers;
275 size_t _cur_buffer_idx;
276 size_t _padding;
277 };
278
279
280 class DemoServer : public P2pDemoCommon {
281 public:
282 // sends an IO response when done
283 class IoWriteResponseCallback : public UcxCallback {
284 public:
IoWriteResponseCallback(size_t buffer_size,MemoryPool<IoWriteResponseCallback> * pool)285 IoWriteResponseCallback(size_t buffer_size,
286 MemoryPool<IoWriteResponseCallback>* pool) :
287 _server(NULL), _conn(NULL), _sn(0), _data_size(0), _chunk_cnt(0) {
288 _pool = pool;
289 }
290
init(DemoServer * server,UcxConnection * conn,uint32_t sn,size_t data_size,uint32_t chunk_cnt=1)291 void init(DemoServer *server, UcxConnection* conn, uint32_t sn,
292 size_t data_size, uint32_t chunk_cnt = 1) {
293 _server = server;
294 _conn = conn;
295 _sn = sn;
296 _data_size = data_size;
297 _chunk_cnt = chunk_cnt;
298 }
299
operator ()(ucs_status_t status)300 virtual void operator()(ucs_status_t status) {
301 if (--_chunk_cnt > 0) {
302 return;
303 }
304 if (status == UCS_OK) {
305 _server->send_io_message(_conn, IO_COMP, _sn, _data_size);
306 }
307 _pool->put(this);
308 }
309
310 private:
311 DemoServer* _server;
312 UcxConnection* _conn;
313 uint32_t _sn;
314 size_t _data_size;
315 uint32_t _chunk_cnt;
316 MemoryPool<IoWriteResponseCallback>* _pool;
317 };
318
DemoServer(const options_t & test_opts)319 DemoServer(const options_t& test_opts) :
320 P2pDemoCommon(test_opts), _callback_pool(0) {
321 }
322
run()323 void run() {
324 struct sockaddr_in listen_addr;
325 memset(&listen_addr, 0, sizeof(listen_addr));
326 listen_addr.sin_family = AF_INET;
327 listen_addr.sin_addr.s_addr = INADDR_ANY;
328 listen_addr.sin_port = htons(opts().port_num);
329
330 listen((const struct sockaddr*)&listen_addr, sizeof(listen_addr));
331 for (;;) {
332 try {
333 progress();
334 } catch (const std::exception &e) {
335 std::cerr << e.what();
336 }
337 }
338 }
339
handle_io_read_request(UcxConnection * conn,const iomsg_hdr_t * hdr)340 void handle_io_read_request(UcxConnection* conn, const iomsg_hdr_t *hdr) {
341 // send data
342 VERBOSE_LOG << "sending IO read data";
343 assert(opts().max_data_size >= hdr->data_size);
344
345 send_data(conn, hdr->data_size, hdr->sn);
346
347 // send response as data
348 VERBOSE_LOG << "sending IO read response";
349 IoMessage *response = _io_msg_pool.get();
350 response->init(IO_COMP, hdr->sn, 0);
351 conn->send_data(response->buffer(), opts().iomsg_size, hdr->sn,
352 response);
353
354 next_buffer();
355 }
356
handle_io_write_request(UcxConnection * conn,const iomsg_hdr_t * hdr)357 void handle_io_write_request(UcxConnection* conn, const iomsg_hdr_t *hdr) {
358 VERBOSE_LOG << "receiving IO write data";
359 assert(opts().max_data_size >= hdr->data_size);
360 assert(hdr->data_size != 0);
361
362 IoWriteResponseCallback *w = _callback_pool.get();
363 w->init(this, conn, hdr->sn, hdr->data_size, get_chunk_cnt(hdr->data_size));
364 recv_data(conn, hdr->data_size, hdr->sn, w);
365
366 next_buffer();
367 }
368
dispatch_connection_error(UcxConnection * conn)369 virtual void dispatch_connection_error(UcxConnection *conn) {
370 LOG << "deleting connection " << conn;
371 delete conn;
372 }
373
dispatch_io_message(UcxConnection * conn,const void * buffer,size_t length)374 virtual void dispatch_io_message(UcxConnection* conn, const void *buffer,
375 size_t length) {
376 const iomsg_hdr_t *hdr = reinterpret_cast<const iomsg_hdr_t*>(buffer);
377
378 VERBOSE_LOG << "got io message " << io_op_names[hdr->op] << " sn "
379 << hdr->sn << " data size " << hdr->data_size << " conn "
380 << conn;
381
382 if (hdr->op == IO_READ) {
383 handle_io_read_request(conn, hdr);
384 } else if (hdr->op == IO_WRITE) {
385 handle_io_write_request(conn, hdr);
386 } else {
387 LOG << "Invalid opcode: " << hdr->op;
388 }
389 }
390 protected:
391 MemoryPool<IoWriteResponseCallback> _callback_pool;
392 };
393
394
395 class DemoClient : public P2pDemoCommon {
396 public:
397 class IoReadResponseCallback : public UcxCallback {
398 public:
IoReadResponseCallback(size_t buffer_size,MemoryPool<IoReadResponseCallback> * pool)399 IoReadResponseCallback(size_t buffer_size,
400 MemoryPool<IoReadResponseCallback>* pool) :
401 _counter(0), _io_counter(0), _chunk_cnt(0) {
402 _buffer = malloc(buffer_size);
403 _pool = pool;
404 }
405
init(long * counter,uint32_t chunk_cnt=1)406 void init(long *counter, uint32_t chunk_cnt = 1) {
407 _counter = 0;
408 _io_counter = counter;
409 _chunk_cnt = chunk_cnt;
410 }
411
~IoReadResponseCallback()412 ~IoReadResponseCallback() {
413 free(_buffer);
414 }
415
operator ()(ucs_status_t status)416 virtual void operator()(ucs_status_t status) {
417 /* wait data and response completion */
418 if (++_counter < (1 + _chunk_cnt)) {
419 return;
420 }
421
422 ++(*_io_counter);
423 _pool->put(this);
424 }
425
buffer()426 void* buffer() {
427 return _buffer;
428 }
429
430 private:
431 long _counter;
432 long* _io_counter;
433 uint32_t _chunk_cnt;
434 void* _buffer;
435 MemoryPool<IoReadResponseCallback>* _pool;
436 };
437
DemoClient(const options_t & test_opts)438 DemoClient(const options_t& test_opts) :
439 P2pDemoCommon(test_opts),
440 _num_sent(0), _num_completed(0), _status(OK), _start_time(get_time()),
441 _retry(0), _callback_pool(opts().iomsg_size) {
442 _status_str[OK] = "ok";
443 _status_str[ERROR] = "error";
444 _status_str[RUNTIME_EXCEEDED] = "run-time exceeded";
445 _status_str[CONN_RETRIES_EXCEEDED] = "connection retries exceeded";
446 }
447
448 typedef enum {
449 OK,
450 ERROR,
451 RUNTIME_EXCEEDED,
452 CONN_RETRIES_EXCEEDED
453 } status_t;
454
do_io_read(UcxConnection * conn,uint32_t sn)455 size_t do_io_read(UcxConnection *conn, uint32_t sn) {
456 size_t data_size = get_data_size();
457
458 if (!send_io_message(conn, IO_READ, sn, data_size)) {
459 return data_size;
460 }
461
462 ++_num_sent;
463 IoReadResponseCallback *r = _callback_pool.get();
464 r->init(&_num_completed, get_chunk_cnt(data_size));
465 recv_data(conn, data_size, sn, r);
466 conn->recv_data(r->buffer(), opts().iomsg_size, sn, r);
467 next_buffer();
468
469 return data_size;
470 }
471
do_io_write(UcxConnection * conn,uint32_t sn)472 size_t do_io_write(UcxConnection *conn, uint32_t sn) {
473 size_t data_size = get_data_size();
474
475 if (!send_io_message(conn, IO_WRITE, sn, data_size)) {
476 return data_size;
477 }
478
479 ++_num_sent;
480 VERBOSE_LOG << "sending data " << buffer() << " size "
481 << data_size << " sn " << sn;
482 send_data(conn, data_size, sn);
483 next_buffer();
484
485 return data_size;
486 }
487
dispatch_io_message(UcxConnection * conn,const void * buffer,size_t length)488 virtual void dispatch_io_message(UcxConnection* conn, const void *buffer,
489 size_t length) {
490 const iomsg_hdr_t *hdr = reinterpret_cast<const iomsg_hdr_t*>(buffer);
491
492 VERBOSE_LOG << "got io message " << io_op_names[hdr->op] << " sn "
493 << hdr->sn << " data size " << hdr->data_size
494 << " conn " << conn;
495
496 if (hdr->op == IO_COMP) {
497 ++_num_completed;
498 }
499 }
500
dispatch_connection_error(UcxConnection * conn)501 virtual void dispatch_connection_error(UcxConnection *conn) {
502 LOG << "setting error flag on connection " << conn;
503 _status = ERROR;
504 }
505
wait_for_responses(long max_outstanding)506 bool wait_for_responses(long max_outstanding) {
507 struct timeval tv_start = {};
508 bool timer_started = false;
509 struct timeval tv_curr, tv_diff;
510 long count;
511
512 count = 0;
513 while (((_num_sent - _num_completed) > max_outstanding) && (_status == OK)) {
514 if (count < 1000) {
515 progress();
516 ++count;
517 continue;
518 }
519
520 count = 0;
521
522 gettimeofday(&tv_curr, NULL);
523
524 if (!timer_started) {
525 tv_start = tv_curr;
526 timer_started = true;
527 continue;
528 }
529
530 timersub(&tv_curr, &tv_start, &tv_diff);
531 double elapsed = tv_diff.tv_sec + (tv_diff.tv_usec * 1e-6);
532 if (elapsed > _test_opts.client_timeout * 10) {
533 LOG << "timeout waiting for " << (_num_sent - _num_completed)
534 << " replies";
535 _status = ERROR;
536 }
537 }
538
539 return (_status == OK);
540 }
541
connect()542 UcxConnection* connect() {
543 struct sockaddr_in connect_addr;
544 memset(&connect_addr, 0, sizeof(connect_addr));
545 connect_addr.sin_family = AF_INET;
546 connect_addr.sin_port = htons(opts().port_num);
547 inet_pton(AF_INET, opts().server_addr, &connect_addr.sin_addr);
548
549 return UcxContext::connect((const struct sockaddr*)&connect_addr,
550 sizeof(connect_addr));
551 }
552
get_time()553 static double get_time() {
554 struct timeval tv;
555 gettimeofday(&tv, NULL);
556 return tv.tv_sec + (tv.tv_usec * 1e-6);
557 }
558
get_time_str()559 static std::string get_time_str() {
560 char str[80];
561 struct timeval tv;
562 gettimeofday(&tv, NULL);
563 snprintf(str, sizeof(str), "[%lu.%06lu]", tv.tv_sec, tv.tv_usec);
564 return str;
565 }
566
run()567 bool run() {
568 UcxConnection* conn = connect();
569 if (!conn) {
570 return false;
571 }
572
573 _status = OK;
574
575 // TODO reset these values by canceling requests
576 _num_sent = 0;
577 _num_completed = 0;
578
579 double prev_time = get_time();
580 long total_iter = 0;
581 long total_prev_iter = 0;
582 std::vector<op_info_t> info;
583
584 for (int i = 0; i < IO_COMP; ++i) {
585 op_info_t op_info = {static_cast<io_op_t>(i), 0, 0};
586 info.push_back(op_info);
587 }
588
589 while ((total_iter < opts().iter_count) && (_status == OK)) {
590 VERBOSE_LOG << " <<<< iteration " << total_iter << " >>>>";
591
592 if (!wait_for_responses(opts().window_size - 1)) {
593 break;
594 }
595
596 io_op_t op = get_op();
597 size_t size;
598 switch (op) {
599 case IO_READ:
600 size = do_io_read(conn, total_iter);
601 break;
602 case IO_WRITE:
603 size = do_io_write(conn, total_iter);
604 break;
605 default:
606 abort();
607 }
608
609 info[op].total_bytes += size;
610 info[op].num_iters++;
611
612 if (((total_iter % 10) == 0) && (total_iter > total_prev_iter)) {
613 double curr_time = get_time();
614 if (curr_time >= (prev_time + 1.0)) {
615 if (!wait_for_responses(0)) {
616 break;
617 }
618
619 report_performance(total_iter - total_prev_iter,
620 curr_time - prev_time, info);
621
622 total_prev_iter = total_iter;
623 prev_time = curr_time;
624
625 check_time_limit(curr_time);
626 }
627 }
628
629 ++total_iter;
630 }
631
632 if (wait_for_responses(0)) {
633 double curr_time = get_time();
634 report_performance(total_iter - total_prev_iter,
635 curr_time - prev_time, info);
636 check_time_limit(curr_time);
637 }
638
639 delete conn;
640 return (_status == OK) || (_status == RUNTIME_EXCEEDED);
641 }
642
643 // returns true if number of connection retries is exceeded
update_retry()644 bool update_retry() {
645 if (++_retry >= opts().client_retries) {
646 /* client failed all retries */
647 _status = CONN_RETRIES_EXCEEDED;
648 return true;
649 }
650
651 LOG << "retry " << _retry << "/" << opts().client_retries
652 << " in " << opts().client_timeout << " seconds";
653 usleep((int)(1e6 * opts().client_timeout));
654 return false;
655 }
656
get_status() const657 status_t get_status() const {
658 return _status;
659 }
660
get_status_str()661 const std::string& get_status_str() {
662 return _status_str[_status];
663 }
664
665 private:
666 typedef struct {
667 io_op_t op;
668 long num_iters;
669 size_t total_bytes;
670 } op_info_t;
671
get_op()672 inline io_op_t get_op() {
673 if (opts().operations.size() == 1) {
674 return opts().operations[0];
675 }
676
677 return opts().operations[IoDemoRandom::rand(
678 0, opts().operations.size() - 1)];
679 }
680
check_time_limit(double current_time)681 inline void check_time_limit(double current_time) {
682 if ((_status == OK) &&
683 ((current_time - _start_time) >= opts().client_runtime_limit)) {
684 _status = RUNTIME_EXCEEDED;
685 }
686 }
687
report_performance(long num_iters,double elapsed,std::vector<op_info_t> & info)688 void report_performance(long num_iters, double elapsed,
689 std::vector<op_info_t> &info) {
690 if (num_iters == 0) {
691 return;
692 }
693
694 double latency_usec = (elapsed / num_iters) * 1e6;
695 bool first_print = true;
696
697 for (unsigned i = 0; i < info.size(); ++i) {
698 op_info_t *op_info = &info[i];
699
700 if (!op_info->total_bytes) {
701 continue;
702 }
703
704 if (first_print) {
705 std::cout << get_time_str() << " ";
706 first_print = false;
707 } else {
708 // print comma for non-first printouts
709 std::cout << ", ";
710 }
711
712 double throughput_mbs = op_info->total_bytes /
713 elapsed / (1024.0 * 1024.0);
714
715 std::cout << op_info->num_iters << " "
716 << io_op_names[op_info->op] << "s at "
717 << throughput_mbs << " MB/s";
718
719 // reset for the next round
720 op_info->total_bytes = 0;
721 op_info->num_iters = 0;
722 }
723
724 if (!first_print) {
725 if (opts().window_size == 1) {
726 std::cout << ", average latency: " << latency_usec << " usec";
727 }
728 std::cout << std::endl;
729 }
730 }
731
732 private:
733 long _num_sent;
734 long _num_completed;
735 status_t _status;
736 std::map<status_t, std::string> _status_str;
737 double _start_time;
738 unsigned _retry;
739 protected:
740 MemoryPool<IoReadResponseCallback> _callback_pool;
741 };
742
set_data_size(char * str,options_t * test_opts)743 static int set_data_size(char *str, options_t *test_opts)
744 {
745 const static char token = ':';
746 char *val1, *val2;
747
748 if (strchr(str, token) == NULL) {
749 test_opts->min_data_size =
750 test_opts->max_data_size = strtol(str, NULL, 0);
751 return 0;
752 }
753
754 val1 = strtok(str, ":");
755 val2 = strtok(NULL, ":");
756
757 if ((val1 != NULL) && (val2 != NULL)) {
758 test_opts->min_data_size = strtol(val1, NULL, 0);
759 test_opts->max_data_size = strtol(val2, NULL, 0);
760 } else if (val1 != NULL) {
761 if (str[0] == ':') {
762 test_opts->min_data_size = 0;
763 test_opts->max_data_size = strtol(val1, NULL, 0);
764 } else {
765 test_opts->min_data_size = strtol(val1, NULL, 0);
766 }
767 } else {
768 return -1;
769 }
770
771 return 0;
772 }
773
set_time(char * str,double * dest_p)774 static int set_time(char *str, double *dest_p)
775 {
776 char units[3] = "";
777 int num_fields;
778 double value;
779 double per_sec;
780
781 if (!strcmp(str, "inf")) {
782 *dest_p = std::numeric_limits<double>::max();
783 return 0;
784 }
785
786 num_fields = sscanf(str, "%lf%c%c", &value, &units[0], &units[1]);
787 if (num_fields == 1) {
788 per_sec = 1;
789 } else if ((num_fields == 2) || (num_fields == 3)) {
790 if (!strcmp(units, "h")) {
791 per_sec = 1.0 / 3600.0;
792 } else if (!strcmp(units, "m")) {
793 per_sec = 1.0 / 60.0;
794 } else if (!strcmp(units, "s")) {
795 per_sec = 1;
796 } else if (!strcmp(units, "ms")) {
797 per_sec = 1e3;
798 } else if (!strcmp(units, "us")) {
799 per_sec = 1e6;
800 } else if (!strcmp(units, "ns")) {
801 per_sec = 1e9;
802 } else {
803 return -1;
804 }
805 } else {
806 return -1;
807 }
808
809 *(double*)dest_p = value / per_sec;
810 return 0;
811 }
812
parse_args(int argc,char ** argv,options_t * test_opts)813 static int parse_args(int argc, char **argv, options_t *test_opts)
814 {
815 char *str;
816 bool found;
817 int c;
818
819 test_opts->server_addr = NULL;
820 test_opts->port_num = 1337;
821 test_opts->client_retries = std::numeric_limits<long>::max();
822 test_opts->client_timeout = 1.0;
823 test_opts->client_runtime_limit = std::numeric_limits<double>::max();
824 test_opts->min_data_size = 4096;
825 test_opts->max_data_size = 4096;
826 test_opts->chunk_size = std::numeric_limits<unsigned>::max();
827 test_opts->num_buffers = 1;
828 test_opts->iomsg_size = 256;
829 test_opts->iter_count = 1000;
830 test_opts->window_size = 1;
831 test_opts->random_seed = std::time(NULL);
832 test_opts->verbose = false;
833
834 while ((c = getopt(argc, argv, "p:c:r:d:b:i:w:k:o:t:l:s:v")) != -1) {
835 switch (c) {
836 case 'p':
837 test_opts->port_num = atoi(optarg);
838 break;
839 case 'c':
840 if (strcmp(optarg, "inf")) {
841 test_opts->client_retries = strtol(optarg, NULL, 0);
842 }
843 break;
844 case 'r':
845 test_opts->iomsg_size = strtol(optarg, NULL, 0);
846 break;
847 case 'd':
848 if (set_data_size(optarg, test_opts) == -1) {
849 std::cout << "invalid data size range '" << optarg << "'" << std::endl;
850 return -1;
851 }
852 break;
853 case 'b':
854 test_opts->num_buffers = strtol(optarg, NULL, 0);
855 if (test_opts->num_buffers == 0) {
856 std::cout << "number of buffers ('" << optarg << "')"
857 << " has to be > 0" << std::endl;
858 return -1;
859 }
860 break;
861 case 'i':
862 test_opts->iter_count = strtol(optarg, NULL, 0);
863 break;
864 case 'w':
865 test_opts->window_size = atoi(optarg);
866 break;
867 case 'k':
868 test_opts->chunk_size = strtol(optarg, NULL, 0);
869 break;
870 case 'o':
871 str = strtok(optarg, ",");
872 while (str != NULL) {
873 found = false;
874
875 for (int op_it = 0; op_it < IO_COMP; ++op_it) {
876 if (!strcmp(io_op_names[op_it], str)) {
877 io_op_t op = static_cast<io_op_t>(op_it);
878 if (std::find(test_opts->operations.begin(),
879 test_opts->operations.end(),
880 op) == test_opts->operations.end()) {
881 test_opts->operations.push_back(op);
882 }
883 found = true;
884 }
885 }
886
887 if (!found) {
888 std::cout << "invalid operation name '" << str << "'" << std::endl;
889 return -1;
890 }
891
892 str = strtok(NULL, ",");
893 }
894
895 if (test_opts->operations.size() == 0) {
896 std::cout << "no operation names were provided '" << optarg << "'" << std::endl;
897 return -1;
898 }
899 break;
900 case 't':
901 if (set_time(optarg, &test_opts->client_timeout) != 0) {
902 std::cout << "invalid '" << optarg << "' value for client timeout" << std::endl;
903 return -1;
904 }
905 break;
906 case 'l':
907 if (set_time(optarg, &test_opts->client_runtime_limit) != 0) {
908 std::cout << "invalid '" << optarg << "' value for client run-time limit" << std::endl;
909 return -1;
910 }
911 break;
912 case 's':
913 test_opts->random_seed = strtoul(optarg, NULL, 0);
914 break;
915 case 'v':
916 test_opts->verbose = true;
917 break;
918 case 'h':
919 default:
920 std::cout << "Usage: io_demo [options] [server_address]" << std::endl;
921 std::cout << "" << std::endl;
922 std::cout << "Supported options are:" << std::endl;
923 std::cout << " -p <port> TCP port number to use" << std::endl;
924 std::cout << " -o <op1,op2,...,opN> Comma-separated string of IO operations [read|write]" << std::endl;
925 std::cout << " NOTE: if using several IO operations, performance" << std::endl;
926 std::cout << " measurments may be inaccurate" << std::endl;
927 std::cout << " -d <min>:<max> Range that should be used to get data" << std::endl;
928 std::cout << " size of IO payload" << std::endl;
929 std::cout << " -b <number of buffers> Number of IO buffers to use for communications" << std::endl;
930 std::cout << " -i <iterations-count> Number of iterations to run communication" << std::endl;
931 std::cout << " -w <window-size> Number of outstanding requests" << std::endl;
932 std::cout << " -k <chunk-size> Split the data transfer to chunks of this size" << std::endl;
933 std::cout << " -r <io-request-size> Size of IO request packet" << std::endl;
934 std::cout << " -c <client retries> Number of connection retries on client" << std::endl;
935 std::cout << " (or \"inf\") for failure" << std::endl;
936 std::cout << " -t <client timeout> Client timeout (or \"inf\")" << std::endl;
937 std::cout << " -l <client run-time limit> Time limit to run the IO client (or \"inf\")" << std::endl;
938 std::cout << " Examples: -l 17.5s; -l 10m; 15.5h" << std::endl;
939 std::cout << " -s <random seed> Random seed to use for randomizing" << std::endl;
940 std::cout << " -v Set verbose mode" << std::endl;
941 std::cout << "" << std::endl;
942 return -1;
943 }
944 }
945
946 if (optind < argc) {
947 test_opts->server_addr = argv[optind];
948 }
949
950 if (test_opts->operations.size() == 0) {
951 test_opts->operations.push_back(IO_WRITE);
952 }
953
954 return 0;
955 }
956
do_server(const options_t & test_opts)957 static int do_server(const options_t& test_opts)
958 {
959 DemoServer server(test_opts);
960 if (!server.init()) {
961 return -1;
962 }
963
964 server.run();
965 return 0;
966 }
967
do_client(const options_t & test_opts)968 static int do_client(const options_t& test_opts)
969 {
970 IoDemoRandom::srand(test_opts.random_seed);
971 LOG << "random seed: " << test_opts.random_seed;
972
973 DemoClient client(test_opts);
974 if (!client.init()) {
975 return -1;
976 }
977
978 for (;;) {
979 if (client.run()) {
980 /* successful run */
981 break;
982 }
983
984 if (client.update_retry()) {
985 break;
986 }
987 }
988
989 DemoClient::status_t status = client.get_status();
990 LOG << "client exit with \"" << client.get_status_str() << "\" status";
991 return ((status == DemoClient::OK) ||
992 (status == DemoClient::RUNTIME_EXCEEDED)) ? 0 : -1;
993 }
994
main(int argc,char ** argv)995 int main(int argc, char **argv)
996 {
997 options_t test_opts;
998 int ret;
999
1000 ret = parse_args(argc, argv, &test_opts);
1001 if (ret < 0) {
1002 return ret;
1003 }
1004
1005 if (test_opts.server_addr == NULL) {
1006 return do_server(test_opts);
1007 } else {
1008 return do_client(test_opts);
1009 }
1010 }
1011