1 /*
2  * Copyright (C) Mellanox Technologies Ltd. 2020.  ALL RIGHTS RESERVED.
3  *
4  * See file LICENSE for terms.
5  */
6 
7 #include "ucx_wrapper.h"
8 
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #include <sys/time.h>
12 #include <iostream>
13 #include <string.h>
14 #include <getopt.h>
15 #include <assert.h>
16 #include <unistd.h>
17 #include <cstdlib>
18 #include <ctime>
19 #include <vector>
20 #include <map>
21 #include <algorithm>
22 #include <limits>
23 
24 #define ALIGNMENT 4096
25 
26 
27 /* IO operation type */
28 typedef enum {
29     IO_READ,
30     IO_WRITE,
31     IO_COMP
32 } io_op_t;
33 
34 static const char *io_op_names[] = {
35     "read",
36     "write",
37     "completion"
38 };
39 
40 /* test options */
41 typedef struct {
42     const char           *server_addr;
43     int                  port_num;
44     long                 client_retries;
45     double               client_timeout;
46     double               client_runtime_limit;
47     size_t               iomsg_size;
48     size_t               min_data_size;
49     size_t               max_data_size;
50     size_t               chunk_size;
51     long                 iter_count;
52     long                 window_size;
53     std::vector<io_op_t> operations;
54     unsigned             random_seed;
55     size_t               num_buffers;
56     bool                 verbose;
57 } options_t;
58 
59 #define LOG         UcxLog("[DEMO]", true)
60 #define VERBOSE_LOG UcxLog("[DEMO]", _test_opts.verbose)
61 
62 template<class T>
63 class MemoryPool {
64 public:
MemoryPool(size_t buffer_size=0)65     MemoryPool(size_t buffer_size = 0) :
66         _num_allocated(0), _buffer_size(buffer_size) {
67     }
68 
~MemoryPool()69     ~MemoryPool() {
70         if (_num_allocated != _freelist.size()) {
71             LOG << "Some items were not freed. Total:" << _num_allocated
72                 << ", current:" << _freelist.size() << ".";
73         }
74 
75         for (size_t i = 0; i < _freelist.size(); i++) {
76             delete _freelist[i];
77         }
78     }
79 
get()80     T * get() {
81         T * item;
82 
83         if (_freelist.empty()) {
84             item = new T(_buffer_size, this);
85             _num_allocated++;
86         } else {
87             item = _freelist.back();
88             _freelist.pop_back();
89         }
90         return item;
91     }
92 
put(T * item)93     void put(T * item) {
94         _freelist.push_back(item);
95     }
96 
97 private:
98     std::vector<T*> _freelist;
99     uint32_t        _num_allocated;
100     size_t          _buffer_size;
101 };
102 
103 /**
104  * Linear congruential generator (LCG):
105  * n[i + 1] = (n[i] * A + C) % M
106  * where A, C, M used as in glibc
107  */
108 class IoDemoRandom {
109 public:
srand(unsigned seed)110     static void srand(unsigned seed) {
111         _seed = seed & _M;
112     }
113 
rand(int min=std::numeric_limits<int>::min (),int max=std::numeric_limits<int>::max ())114     static inline int rand(int min = std::numeric_limits<int>::min(),
115                            int max = std::numeric_limits<int>::max()) {
116         _seed = (_seed * _A + _C) & _M;
117         /* To resolve that LCG returns alternating even/odd values */
118         if (max - min == 1) {
119             return (_seed & 0x100) ? max : min;
120         } else {
121             return (int)_seed % (max - min + 1) + min;
122         }
123     }
124 
125 private:
126     static       unsigned     _seed;
127     static const unsigned     _A;
128     static const unsigned     _C;
129     static const unsigned     _M;
130 };
131 unsigned IoDemoRandom::_seed    = 0;
132 const unsigned IoDemoRandom::_A = 1103515245U;
133 const unsigned IoDemoRandom::_C = 12345U;
134 const unsigned IoDemoRandom::_M = 0x7fffffffU;
135 
136 class P2pDemoCommon : public UcxContext {
137 protected:
138 
139     /* IO request header */
140     typedef struct {
141         io_op_t     op;
142         uint32_t    sn;
143         size_t      data_size;
144     } iomsg_hdr_t;
145 
146     typedef enum {
147         XFER_TYPE_SEND,
148         XFER_TYPE_RECV
149     } xfer_type_t;
150 
151     /* Asynchronous IO message */
152     class IoMessage : public UcxCallback {
153     public:
IoMessage(size_t buffer_size,MemoryPool<IoMessage> * pool)154         IoMessage(size_t buffer_size, MemoryPool<IoMessage>* pool) {
155             _buffer      = malloc(buffer_size);
156             _pool        = pool;
157             _buffer_size = buffer_size;
158         }
159 
init(io_op_t op,uint32_t sn,size_t data_size)160         void init(io_op_t op, uint32_t sn, size_t data_size) {
161             iomsg_hdr_t *hdr = reinterpret_cast<iomsg_hdr_t*>(_buffer);
162             assert(sizeof(*hdr) <= _buffer_size);
163             hdr->op          = op;
164             hdr->sn          = sn;
165             hdr->data_size   = data_size;
166         }
167 
~IoMessage()168         ~IoMessage() {
169             free(_buffer);
170         }
171 
operator ()(ucs_status_t status)172         virtual void operator()(ucs_status_t status) {
173             _pool->put(this);
174         }
175 
buffer()176         void *buffer() {
177             return _buffer;
178         }
179 
180     private:
181         void*                  _buffer;
182         MemoryPool<IoMessage>* _pool;
183         size_t                 _buffer_size;
184     };
185 
P2pDemoCommon(const options_t & test_opts)186     P2pDemoCommon(const options_t& test_opts) :
187         UcxContext(test_opts.iomsg_size), _test_opts(test_opts),
188         _io_msg_pool(opts().iomsg_size), _cur_buffer_idx(0), _padding(0) {
189 
190         _data_buffers.resize(opts().num_buffers);
191         for (size_t i = 0; i < _data_buffers.size(); ++i) {
192             std::string &data_buffer = _data_buffers[i];
193             data_buffer.resize(opts().max_data_size + ALIGNMENT);
194             uintptr_t ptr = (uintptr_t)&data_buffer[0];
195             _padding = ((ptr + ALIGNMENT - 1) & ~(ALIGNMENT - 1)) - ptr;
196         }
197     }
198 
opts() const199     const options_t& opts() const {
200         return _test_opts;
201     }
202 
buffer()203     inline void *buffer() {
204         return &_data_buffers[_cur_buffer_idx][_padding];
205     }
206 
buffer(size_t offset)207     inline void *buffer(size_t offset) {
208         return &_data_buffers[_cur_buffer_idx][_padding + offset];
209     }
210 
next_buffer()211     inline void next_buffer() {
212         _cur_buffer_idx = (_cur_buffer_idx + 1) % _data_buffers.size();
213         assert(_cur_buffer_idx < opts().num_buffers);
214     }
215 
get_data_size()216     inline size_t get_data_size() {
217         return IoDemoRandom::rand(opts().min_data_size,
218                                   opts().max_data_size);
219     }
220 
send_io_message(UcxConnection * conn,io_op_t op,uint32_t sn,size_t data_size)221     bool send_io_message(UcxConnection *conn, io_op_t op,
222                          uint32_t sn, size_t data_size) {
223         IoMessage *m = _io_msg_pool.get();
224         m->init(op, sn, data_size);
225         VERBOSE_LOG << "sending IO " << io_op_names[op] << ", sn " << sn
226                     << " data size " << data_size;
227         return conn->send_io_message(m->buffer(), opts().iomsg_size, m);
228     }
229 
send_recv_data_as_chunks(UcxConnection * conn,size_t data_size,uint32_t sn,xfer_type_t send_recv_data,UcxCallback * callback=EmptyCallback::get ())230     void send_recv_data_as_chunks(UcxConnection* conn, size_t data_size, uint32_t sn,
231                                   xfer_type_t send_recv_data,
232                                   UcxCallback* callback = EmptyCallback::get()) {
233         size_t remaining = data_size;
234         while (remaining > 0) {
235             size_t xfer_size = std::min(opts().chunk_size, remaining);
236             if (send_recv_data == XFER_TYPE_SEND) {
237                 conn->send_data(buffer(data_size - remaining), xfer_size, sn, callback);
238             } else {
239                 conn->recv_data(buffer(data_size - remaining), xfer_size, sn, callback);
240             }
241             remaining -= xfer_size;
242         }
243     }
244 
send_data_as_chunks(UcxConnection * conn,size_t data_size,uint32_t sn,UcxCallback * callback=EmptyCallback::get ())245     void send_data_as_chunks(UcxConnection* conn, size_t data_size, uint32_t sn,
246                              UcxCallback* callback = EmptyCallback::get()) {
247         send_recv_data_as_chunks(conn, data_size, sn, XFER_TYPE_SEND, callback);
248     }
249 
recv_data_as_chunks(UcxConnection * conn,size_t data_size,uint32_t sn,UcxCallback * callback=EmptyCallback::get ())250     void recv_data_as_chunks(UcxConnection* conn, size_t data_size, uint32_t sn,
251                              UcxCallback* callback = EmptyCallback::get()) {
252         send_recv_data_as_chunks(conn, data_size, sn, XFER_TYPE_RECV, callback);
253     }
254 
get_chunk_cnt(size_t data_size)255     uint32_t get_chunk_cnt(size_t data_size) {
256         return (data_size + opts().chunk_size - 1) / opts().chunk_size;
257     }
258 
send_data(UcxConnection * conn,size_t data_size,uint32_t sn,UcxCallback * callback=EmptyCallback::get ())259     void send_data(UcxConnection* conn, size_t data_size, uint32_t sn,
260                    UcxCallback* callback = EmptyCallback::get()) {
261         send_data_as_chunks(conn, data_size, sn, callback);
262     }
263 
recv_data(UcxConnection * conn,size_t data_size,uint32_t sn,UcxCallback * callback=EmptyCallback::get ())264     void recv_data(UcxConnection* conn, size_t data_size, uint32_t sn,
265                    UcxCallback* callback = EmptyCallback::get()) {
266         recv_data_as_chunks(conn, data_size, sn, callback);
267     }
268 
269 protected:
270     const options_t          _test_opts;
271     MemoryPool<IoMessage>    _io_msg_pool;
272 
273 private:
274     std::vector<std::string> _data_buffers;
275     size_t                   _cur_buffer_idx;
276     size_t                   _padding;
277 };
278 
279 
280 class DemoServer : public P2pDemoCommon {
281 public:
282     // sends an IO response when done
283     class IoWriteResponseCallback : public UcxCallback {
284     public:
IoWriteResponseCallback(size_t buffer_size,MemoryPool<IoWriteResponseCallback> * pool)285         IoWriteResponseCallback(size_t buffer_size,
286             MemoryPool<IoWriteResponseCallback>* pool) :
287             _server(NULL), _conn(NULL), _sn(0), _data_size(0), _chunk_cnt(0) {
288             _pool = pool;
289         }
290 
init(DemoServer * server,UcxConnection * conn,uint32_t sn,size_t data_size,uint32_t chunk_cnt=1)291         void init(DemoServer *server, UcxConnection* conn, uint32_t sn,
292                   size_t data_size, uint32_t chunk_cnt = 1) {
293              _server    = server;
294              _conn      = conn;
295              _sn        = sn;
296              _data_size = data_size;
297              _chunk_cnt = chunk_cnt;
298         }
299 
operator ()(ucs_status_t status)300         virtual void operator()(ucs_status_t status) {
301             if (--_chunk_cnt > 0) {
302                 return;
303             }
304             if (status == UCS_OK) {
305                 _server->send_io_message(_conn, IO_COMP, _sn, _data_size);
306             }
307             _pool->put(this);
308         }
309 
310     private:
311         DemoServer*                          _server;
312         UcxConnection*                       _conn;
313         uint32_t                             _sn;
314         size_t                               _data_size;
315         uint32_t                             _chunk_cnt;
316         MemoryPool<IoWriteResponseCallback>* _pool;
317     };
318 
DemoServer(const options_t & test_opts)319     DemoServer(const options_t& test_opts) :
320         P2pDemoCommon(test_opts), _callback_pool(0) {
321     }
322 
run()323     void run() {
324         struct sockaddr_in listen_addr;
325         memset(&listen_addr, 0, sizeof(listen_addr));
326         listen_addr.sin_family      = AF_INET;
327         listen_addr.sin_addr.s_addr = INADDR_ANY;
328         listen_addr.sin_port        = htons(opts().port_num);
329 
330         listen((const struct sockaddr*)&listen_addr, sizeof(listen_addr));
331         for (;;) {
332             try {
333                 progress();
334             } catch (const std::exception &e) {
335                 std::cerr << e.what();
336             }
337         }
338     }
339 
handle_io_read_request(UcxConnection * conn,const iomsg_hdr_t * hdr)340     void handle_io_read_request(UcxConnection* conn, const iomsg_hdr_t *hdr) {
341         // send data
342         VERBOSE_LOG << "sending IO read data";
343         assert(opts().max_data_size >= hdr->data_size);
344 
345         send_data(conn, hdr->data_size, hdr->sn);
346 
347         // send response as data
348         VERBOSE_LOG << "sending IO read response";
349         IoMessage *response = _io_msg_pool.get();
350         response->init(IO_COMP, hdr->sn, 0);
351         conn->send_data(response->buffer(), opts().iomsg_size, hdr->sn,
352                         response);
353 
354         next_buffer();
355     }
356 
handle_io_write_request(UcxConnection * conn,const iomsg_hdr_t * hdr)357     void handle_io_write_request(UcxConnection* conn, const iomsg_hdr_t *hdr) {
358         VERBOSE_LOG << "receiving IO write data";
359         assert(opts().max_data_size >= hdr->data_size);
360         assert(hdr->data_size != 0);
361 
362         IoWriteResponseCallback *w = _callback_pool.get();
363         w->init(this, conn, hdr->sn, hdr->data_size, get_chunk_cnt(hdr->data_size));
364         recv_data(conn, hdr->data_size, hdr->sn, w);
365 
366         next_buffer();
367     }
368 
dispatch_connection_error(UcxConnection * conn)369     virtual void dispatch_connection_error(UcxConnection *conn) {
370         LOG << "deleting connection " << conn;
371         delete conn;
372     }
373 
dispatch_io_message(UcxConnection * conn,const void * buffer,size_t length)374     virtual void dispatch_io_message(UcxConnection* conn, const void *buffer,
375                                      size_t length) {
376         const iomsg_hdr_t *hdr = reinterpret_cast<const iomsg_hdr_t*>(buffer);
377 
378         VERBOSE_LOG << "got io message " << io_op_names[hdr->op] << " sn "
379                     << hdr->sn << " data size " << hdr->data_size << " conn "
380                     << conn;
381 
382         if (hdr->op == IO_READ) {
383             handle_io_read_request(conn, hdr);
384         } else if (hdr->op == IO_WRITE) {
385             handle_io_write_request(conn, hdr);
386         } else {
387             LOG << "Invalid opcode: " << hdr->op;
388         }
389     }
390 protected:
391     MemoryPool<IoWriteResponseCallback> _callback_pool;
392 };
393 
394 
395 class DemoClient : public P2pDemoCommon {
396 public:
397     class IoReadResponseCallback : public UcxCallback {
398     public:
IoReadResponseCallback(size_t buffer_size,MemoryPool<IoReadResponseCallback> * pool)399         IoReadResponseCallback(size_t buffer_size,
400             MemoryPool<IoReadResponseCallback>* pool) :
401             _counter(0), _io_counter(0), _chunk_cnt(0) {
402             _buffer = malloc(buffer_size);
403             _pool   = pool;
404         }
405 
init(long * counter,uint32_t chunk_cnt=1)406         void init(long *counter, uint32_t chunk_cnt = 1) {
407             _counter    = 0;
408             _io_counter = counter;
409             _chunk_cnt  = chunk_cnt;
410         }
411 
~IoReadResponseCallback()412         ~IoReadResponseCallback() {
413             free(_buffer);
414         }
415 
operator ()(ucs_status_t status)416         virtual void operator()(ucs_status_t status) {
417             /* wait data and response completion */
418             if (++_counter < (1 + _chunk_cnt)) {
419                 return;
420             }
421 
422             ++(*_io_counter);
423             _pool->put(this);
424         }
425 
buffer()426         void* buffer() {
427             return _buffer;
428         }
429 
430     private:
431         long                                _counter;
432         long*                               _io_counter;
433         uint32_t                            _chunk_cnt;
434         void*                               _buffer;
435         MemoryPool<IoReadResponseCallback>* _pool;
436     };
437 
DemoClient(const options_t & test_opts)438     DemoClient(const options_t& test_opts) :
439         P2pDemoCommon(test_opts),
440         _num_sent(0), _num_completed(0), _status(OK), _start_time(get_time()),
441         _retry(0), _callback_pool(opts().iomsg_size) {
442         _status_str[OK]                    = "ok";
443         _status_str[ERROR]                 = "error";
444         _status_str[RUNTIME_EXCEEDED]      = "run-time exceeded";
445         _status_str[CONN_RETRIES_EXCEEDED] = "connection retries exceeded";
446     }
447 
448     typedef enum {
449         OK,
450         ERROR,
451         RUNTIME_EXCEEDED,
452         CONN_RETRIES_EXCEEDED
453     } status_t;
454 
do_io_read(UcxConnection * conn,uint32_t sn)455     size_t do_io_read(UcxConnection *conn, uint32_t sn) {
456         size_t data_size = get_data_size();
457 
458         if (!send_io_message(conn, IO_READ, sn, data_size)) {
459             return data_size;
460         }
461 
462         ++_num_sent;
463         IoReadResponseCallback *r = _callback_pool.get();
464         r->init(&_num_completed, get_chunk_cnt(data_size));
465         recv_data(conn, data_size, sn, r);
466         conn->recv_data(r->buffer(), opts().iomsg_size, sn, r);
467         next_buffer();
468 
469         return data_size;
470     }
471 
do_io_write(UcxConnection * conn,uint32_t sn)472     size_t do_io_write(UcxConnection *conn, uint32_t sn) {
473         size_t data_size = get_data_size();
474 
475         if (!send_io_message(conn, IO_WRITE, sn, data_size)) {
476             return data_size;
477         }
478 
479         ++_num_sent;
480         VERBOSE_LOG << "sending data " << buffer() << " size "
481                     << data_size << " sn " << sn;
482         send_data(conn, data_size, sn);
483         next_buffer();
484 
485         return data_size;
486     }
487 
dispatch_io_message(UcxConnection * conn,const void * buffer,size_t length)488     virtual void dispatch_io_message(UcxConnection* conn, const void *buffer,
489                                      size_t length) {
490         const iomsg_hdr_t *hdr = reinterpret_cast<const iomsg_hdr_t*>(buffer);
491 
492         VERBOSE_LOG << "got io message " << io_op_names[hdr->op] << " sn "
493                     << hdr->sn << " data size " << hdr->data_size
494                     << " conn " << conn;
495 
496         if (hdr->op == IO_COMP) {
497             ++_num_completed;
498         }
499     }
500 
dispatch_connection_error(UcxConnection * conn)501     virtual void dispatch_connection_error(UcxConnection *conn) {
502         LOG << "setting error flag on connection " << conn;
503         _status = ERROR;
504     }
505 
wait_for_responses(long max_outstanding)506     bool wait_for_responses(long max_outstanding) {
507         struct timeval tv_start = {};
508         bool timer_started      = false;
509         struct timeval tv_curr, tv_diff;
510         long count;
511 
512         count = 0;
513         while (((_num_sent - _num_completed) > max_outstanding) && (_status == OK)) {
514             if (count < 1000) {
515                 progress();
516                 ++count;
517                 continue;
518             }
519 
520             count = 0;
521 
522             gettimeofday(&tv_curr, NULL);
523 
524             if (!timer_started) {
525                 tv_start      = tv_curr;
526                 timer_started = true;
527                 continue;
528             }
529 
530             timersub(&tv_curr, &tv_start, &tv_diff);
531             double elapsed = tv_diff.tv_sec + (tv_diff.tv_usec * 1e-6);
532             if (elapsed > _test_opts.client_timeout * 10) {
533                 LOG << "timeout waiting for " << (_num_sent - _num_completed)
534                     << " replies";
535                 _status = ERROR;
536             }
537         }
538 
539         return (_status == OK);
540     }
541 
connect()542     UcxConnection* connect() {
543         struct sockaddr_in connect_addr;
544         memset(&connect_addr, 0, sizeof(connect_addr));
545         connect_addr.sin_family = AF_INET;
546         connect_addr.sin_port   = htons(opts().port_num);
547         inet_pton(AF_INET, opts().server_addr, &connect_addr.sin_addr);
548 
549         return UcxContext::connect((const struct sockaddr*)&connect_addr,
550                                    sizeof(connect_addr));
551     }
552 
get_time()553     static double get_time() {
554         struct timeval tv;
555         gettimeofday(&tv, NULL);
556         return tv.tv_sec + (tv.tv_usec * 1e-6);
557     }
558 
get_time_str()559     static std::string get_time_str() {
560         char str[80];
561         struct timeval tv;
562         gettimeofday(&tv, NULL);
563         snprintf(str, sizeof(str), "[%lu.%06lu]", tv.tv_sec, tv.tv_usec);
564         return str;
565     }
566 
run()567     bool run() {
568         UcxConnection* conn = connect();
569         if (!conn) {
570             return false;
571         }
572 
573         _status = OK;
574 
575         // TODO reset these values by canceling requests
576         _num_sent      = 0;
577         _num_completed = 0;
578 
579         double prev_time     = get_time();
580         long total_iter      = 0;
581         long total_prev_iter = 0;
582         std::vector<op_info_t> info;
583 
584         for (int i = 0; i < IO_COMP; ++i) {
585             op_info_t op_info = {static_cast<io_op_t>(i), 0, 0};
586             info.push_back(op_info);
587         }
588 
589         while ((total_iter < opts().iter_count) && (_status == OK)) {
590             VERBOSE_LOG << " <<<< iteration " << total_iter << " >>>>";
591 
592             if (!wait_for_responses(opts().window_size - 1)) {
593                 break;
594             }
595 
596             io_op_t op = get_op();
597             size_t size;
598             switch (op) {
599             case IO_READ:
600                 size = do_io_read(conn, total_iter);
601                 break;
602             case IO_WRITE:
603                 size = do_io_write(conn, total_iter);
604                 break;
605             default:
606                 abort();
607             }
608 
609             info[op].total_bytes += size;
610             info[op].num_iters++;
611 
612             if (((total_iter % 10) == 0) && (total_iter > total_prev_iter)) {
613                 double curr_time = get_time();
614                 if (curr_time >= (prev_time + 1.0)) {
615                     if (!wait_for_responses(0)) {
616                         break;
617                     }
618 
619                     report_performance(total_iter - total_prev_iter,
620                                        curr_time - prev_time, info);
621 
622                     total_prev_iter = total_iter;
623                     prev_time       = curr_time;
624 
625                     check_time_limit(curr_time);
626                 }
627             }
628 
629             ++total_iter;
630         }
631 
632         if (wait_for_responses(0)) {
633             double curr_time = get_time();
634             report_performance(total_iter - total_prev_iter,
635                                curr_time - prev_time, info);
636             check_time_limit(curr_time);
637         }
638 
639         delete conn;
640         return (_status == OK) || (_status == RUNTIME_EXCEEDED);
641     }
642 
643     // returns true if number of connection retries is exceeded
update_retry()644     bool update_retry() {
645         if (++_retry >= opts().client_retries) {
646             /* client failed all retries */
647             _status = CONN_RETRIES_EXCEEDED;
648             return true;
649         }
650 
651         LOG << "retry " << _retry << "/" << opts().client_retries
652             << " in " << opts().client_timeout << " seconds";
653         usleep((int)(1e6 * opts().client_timeout));
654         return false;
655     }
656 
get_status() const657     status_t get_status() const {
658         return _status;
659     }
660 
get_status_str()661     const std::string& get_status_str() {
662         return _status_str[_status];
663     }
664 
665 private:
666     typedef struct {
667         io_op_t   op;
668         long      num_iters;
669         size_t    total_bytes;
670     } op_info_t;
671 
get_op()672     inline io_op_t get_op() {
673         if (opts().operations.size() == 1) {
674             return opts().operations[0];
675         }
676 
677         return opts().operations[IoDemoRandom::rand(
678                                  0, opts().operations.size() - 1)];
679     }
680 
check_time_limit(double current_time)681     inline void check_time_limit(double current_time) {
682         if ((_status == OK) &&
683             ((current_time - _start_time) >= opts().client_runtime_limit)) {
684             _status = RUNTIME_EXCEEDED;
685         }
686     }
687 
report_performance(long num_iters,double elapsed,std::vector<op_info_t> & info)688     void report_performance(long num_iters, double elapsed,
689                             std::vector<op_info_t> &info) {
690         if (num_iters == 0) {
691             return;
692         }
693 
694         double latency_usec = (elapsed / num_iters) * 1e6;
695         bool first_print    = true;
696 
697         for (unsigned i = 0; i < info.size(); ++i) {
698             op_info_t *op_info = &info[i];
699 
700             if (!op_info->total_bytes) {
701                 continue;
702             }
703 
704             if (first_print) {
705                 std::cout << get_time_str() << " ";
706                 first_print = false;
707             } else {
708                 // print comma for non-first printouts
709                 std::cout << ", ";
710             }
711 
712             double throughput_mbs = op_info->total_bytes /
713                                     elapsed / (1024.0 * 1024.0);
714 
715             std::cout << op_info->num_iters << " "
716                       << io_op_names[op_info->op] << "s at "
717                       << throughput_mbs << " MB/s";
718 
719             // reset for the next round
720             op_info->total_bytes = 0;
721             op_info->num_iters   = 0;
722         }
723 
724         if (!first_print) {
725             if (opts().window_size == 1) {
726                 std::cout << ", average latency: " << latency_usec << " usec";
727             }
728             std::cout << std::endl;
729         }
730     }
731 
732 private:
733     long                               _num_sent;
734     long                               _num_completed;
735     status_t                           _status;
736     std::map<status_t, std::string>    _status_str;
737     double                             _start_time;
738     unsigned                           _retry;
739 protected:
740     MemoryPool<IoReadResponseCallback> _callback_pool;
741 };
742 
set_data_size(char * str,options_t * test_opts)743 static int set_data_size(char *str, options_t *test_opts)
744 {
745     const static char token = ':';
746     char *val1, *val2;
747 
748     if (strchr(str, token) == NULL) {
749         test_opts->min_data_size =
750             test_opts->max_data_size = strtol(str, NULL, 0);
751         return 0;
752     }
753 
754     val1 = strtok(str, ":");
755     val2 = strtok(NULL, ":");
756 
757     if ((val1 != NULL) && (val2 != NULL)) {
758         test_opts->min_data_size = strtol(val1, NULL, 0);
759         test_opts->max_data_size = strtol(val2, NULL, 0);
760     } else if (val1 != NULL) {
761         if (str[0] == ':') {
762             test_opts->min_data_size = 0;
763             test_opts->max_data_size = strtol(val1, NULL, 0);
764         } else {
765             test_opts->min_data_size = strtol(val1, NULL, 0);
766         }
767     } else {
768         return -1;
769     }
770 
771     return 0;
772 }
773 
set_time(char * str,double * dest_p)774 static int set_time(char *str, double *dest_p)
775 {
776     char units[3] = "";
777     int num_fields;
778     double value;
779     double per_sec;
780 
781     if (!strcmp(str, "inf")) {
782         *dest_p = std::numeric_limits<double>::max();
783         return 0;
784     }
785 
786     num_fields = sscanf(str, "%lf%c%c", &value, &units[0], &units[1]);
787     if (num_fields == 1) {
788         per_sec = 1;
789     } else if ((num_fields == 2) || (num_fields == 3)) {
790         if (!strcmp(units, "h")) {
791             per_sec = 1.0 / 3600.0;
792         } else if (!strcmp(units, "m")) {
793             per_sec = 1.0 / 60.0;
794         } else if (!strcmp(units, "s")) {
795             per_sec = 1;
796         } else if (!strcmp(units, "ms")) {
797             per_sec = 1e3;
798         } else if (!strcmp(units, "us")) {
799             per_sec = 1e6;
800         } else if (!strcmp(units, "ns")) {
801             per_sec = 1e9;
802         } else {
803             return -1;
804         }
805     } else {
806         return -1;
807     }
808 
809     *(double*)dest_p = value / per_sec;
810     return 0;
811 }
812 
parse_args(int argc,char ** argv,options_t * test_opts)813 static int parse_args(int argc, char **argv, options_t *test_opts)
814 {
815     char *str;
816     bool found;
817     int c;
818 
819     test_opts->server_addr          = NULL;
820     test_opts->port_num             = 1337;
821     test_opts->client_retries       = std::numeric_limits<long>::max();
822     test_opts->client_timeout       = 1.0;
823     test_opts->client_runtime_limit = std::numeric_limits<double>::max();
824     test_opts->min_data_size        = 4096;
825     test_opts->max_data_size        = 4096;
826     test_opts->chunk_size           = std::numeric_limits<unsigned>::max();
827     test_opts->num_buffers          = 1;
828     test_opts->iomsg_size           = 256;
829     test_opts->iter_count           = 1000;
830     test_opts->window_size          = 1;
831     test_opts->random_seed          = std::time(NULL);
832     test_opts->verbose              = false;
833 
834     while ((c = getopt(argc, argv, "p:c:r:d:b:i:w:k:o:t:l:s:v")) != -1) {
835         switch (c) {
836         case 'p':
837             test_opts->port_num = atoi(optarg);
838             break;
839         case 'c':
840             if (strcmp(optarg, "inf")) {
841                 test_opts->client_retries = strtol(optarg, NULL, 0);
842             }
843             break;
844         case 'r':
845             test_opts->iomsg_size = strtol(optarg, NULL, 0);
846             break;
847         case 'd':
848             if (set_data_size(optarg, test_opts) == -1) {
849                 std::cout << "invalid data size range '" << optarg << "'" << std::endl;
850                 return -1;
851             }
852             break;
853         case 'b':
854             test_opts->num_buffers = strtol(optarg, NULL, 0);
855             if (test_opts->num_buffers == 0) {
856                 std::cout << "number of buffers ('" << optarg << "')"
857                           << " has to be > 0" << std::endl;
858                 return -1;
859             }
860             break;
861         case 'i':
862             test_opts->iter_count = strtol(optarg, NULL, 0);
863             break;
864         case 'w':
865             test_opts->window_size = atoi(optarg);
866             break;
867         case 'k':
868             test_opts->chunk_size = strtol(optarg, NULL, 0);
869             break;
870         case 'o':
871             str = strtok(optarg, ",");
872             while (str != NULL) {
873                 found = false;
874 
875                 for (int op_it = 0; op_it < IO_COMP; ++op_it) {
876                     if (!strcmp(io_op_names[op_it], str)) {
877                         io_op_t op = static_cast<io_op_t>(op_it);
878                         if (std::find(test_opts->operations.begin(),
879                                       test_opts->operations.end(),
880                                       op) == test_opts->operations.end()) {
881                             test_opts->operations.push_back(op);
882                         }
883                         found = true;
884                     }
885                 }
886 
887                 if (!found) {
888                     std::cout << "invalid operation name '" << str << "'" << std::endl;
889                     return -1;
890                 }
891 
892                 str = strtok(NULL, ",");
893             }
894 
895             if (test_opts->operations.size() == 0) {
896                 std::cout << "no operation names were provided '" << optarg << "'" << std::endl;
897                 return -1;
898             }
899             break;
900         case 't':
901             if (set_time(optarg, &test_opts->client_timeout) != 0) {
902                 std::cout << "invalid '" << optarg << "' value for client timeout" << std::endl;
903                 return -1;
904             }
905             break;
906         case 'l':
907             if (set_time(optarg, &test_opts->client_runtime_limit) != 0) {
908                 std::cout << "invalid '" << optarg << "' value for client run-time limit" << std::endl;
909                 return -1;
910             }
911             break;
912         case 's':
913             test_opts->random_seed = strtoul(optarg, NULL, 0);
914             break;
915         case 'v':
916             test_opts->verbose = true;
917             break;
918         case 'h':
919         default:
920             std::cout << "Usage: io_demo [options] [server_address]" << std::endl;
921             std::cout << "" << std::endl;
922             std::cout << "Supported options are:" << std::endl;
923             std::cout << "  -p <port>                  TCP port number to use" << std::endl;
924             std::cout << "  -o <op1,op2,...,opN>       Comma-separated string of IO operations [read|write]" << std::endl;
925             std::cout << "                             NOTE: if using several IO operations, performance" << std::endl;
926             std::cout << "                                   measurments may be inaccurate" << std::endl;
927             std::cout << "  -d <min>:<max>             Range that should be used to get data" << std::endl;
928             std::cout << "                             size of IO payload" << std::endl;
929             std::cout << "  -b <number of buffers>     Number of IO buffers to use for communications" << std::endl;
930             std::cout << "  -i <iterations-count>      Number of iterations to run communication" << std::endl;
931             std::cout << "  -w <window-size>           Number of outstanding requests" << std::endl;
932             std::cout << "  -k <chunk-size>            Split the data transfer to chunks of this size" << std::endl;
933             std::cout << "  -r <io-request-size>       Size of IO request packet" << std::endl;
934             std::cout << "  -c <client retries>        Number of connection retries on client" << std::endl;
935             std::cout << "                             (or \"inf\") for failure" << std::endl;
936             std::cout << "  -t <client timeout>        Client timeout (or \"inf\")" << std::endl;
937             std::cout << "  -l <client run-time limit> Time limit to run the IO client (or \"inf\")" << std::endl;
938             std::cout << "                             Examples: -l 17.5s; -l 10m; 15.5h" << std::endl;
939             std::cout << "  -s <random seed>           Random seed to use for randomizing" << std::endl;
940             std::cout << "  -v                         Set verbose mode" << std::endl;
941             std::cout << "" << std::endl;
942             return -1;
943         }
944     }
945 
946     if (optind < argc) {
947         test_opts->server_addr = argv[optind];
948     }
949 
950     if (test_opts->operations.size() == 0) {
951         test_opts->operations.push_back(IO_WRITE);
952     }
953 
954     return 0;
955 }
956 
do_server(const options_t & test_opts)957 static int do_server(const options_t& test_opts)
958 {
959     DemoServer server(test_opts);
960     if (!server.init()) {
961         return -1;
962     }
963 
964     server.run();
965     return 0;
966 }
967 
do_client(const options_t & test_opts)968 static int do_client(const options_t& test_opts)
969 {
970     IoDemoRandom::srand(test_opts.random_seed);
971     LOG << "random seed: " << test_opts.random_seed;
972 
973     DemoClient client(test_opts);
974     if (!client.init()) {
975         return -1;
976     }
977 
978     for (;;) {
979         if (client.run()) {
980             /* successful run */
981             break;
982         }
983 
984         if (client.update_retry()) {
985             break;
986         }
987     }
988 
989     DemoClient::status_t status = client.get_status();
990     LOG << "client exit with \"" << client.get_status_str() << "\" status";
991     return ((status == DemoClient::OK) ||
992             (status == DemoClient::RUNTIME_EXCEEDED)) ? 0 : -1;
993 }
994 
main(int argc,char ** argv)995 int main(int argc, char **argv)
996 {
997     options_t test_opts;
998     int ret;
999 
1000     ret = parse_args(argc, argv, &test_opts);
1001     if (ret < 0) {
1002         return ret;
1003     }
1004 
1005     if (test_opts.server_addr == NULL) {
1006         return do_server(test_opts);
1007     } else {
1008         return do_client(test_opts);
1009     }
1010 }
1011