1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "task.h"
23 #include "uv.h"
24 
25 #include <math.h>
26 #include <stdio.h>
27 
28 
29 static int TARGET_CONNECTIONS;
30 #define WRITE_BUFFER_SIZE           8192
31 #define MAX_SIMULTANEOUS_CONNECTS   100
32 
33 #define PRINT_STATS                 0
34 #define STATS_INTERVAL              1000 /* msec */
35 #define STATS_COUNT                 5
36 
37 
38 static void do_write(uv_stream_t*);
39 static void maybe_connect_some();
40 
41 static uv_req_t* req_alloc();
42 static void req_free(uv_req_t* uv_req);
43 
44 static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf);
45 static void buf_free(const uv_buf_t* buf);
46 
47 static uv_loop_t* loop;
48 
49 static uv_tcp_t tcpServer;
50 static uv_pipe_t pipeServer;
51 static uv_stream_t* server;
52 static struct sockaddr_in listen_addr;
53 static struct sockaddr_in connect_addr;
54 
55 static int64_t start_time;
56 
57 static int max_connect_socket = 0;
58 static int max_read_sockets = 0;
59 static int read_sockets = 0;
60 static int write_sockets = 0;
61 
62 static int64_t nrecv = 0;
63 static int64_t nrecv_total = 0;
64 static int64_t nsent = 0;
65 static int64_t nsent_total = 0;
66 
67 static int stats_left = 0;
68 
69 static char write_buffer[WRITE_BUFFER_SIZE];
70 
71 /* Make this as large as you need. */
72 #define MAX_WRITE_HANDLES 1000
73 
74 static stream_type type;
75 
76 static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES];
77 static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES];
78 
79 static uv_timer_t timer_handle;
80 
81 
gbit(int64_t bytes,int64_t passed_ms)82 static double gbit(int64_t bytes, int64_t passed_ms) {
83   double gbits = ((double)bytes / (1024 * 1024 * 1024)) * 8;
84   return gbits / ((double)passed_ms / 1000);
85 }
86 
87 
show_stats(uv_timer_t * handle,int status)88 static void show_stats(uv_timer_t* handle, int status) {
89   int64_t diff;
90   int i;
91 
92 #if PRINT_STATS
93   LOGF("connections: %d, write: %.1f gbit/s\n",
94        write_sockets,
95        gbit(nsent, STATS_INTERVAL));
96 #endif
97 
98   /* Exit if the show is over */
99   if (!--stats_left) {
100 
101     uv_update_time(loop);
102     diff = uv_now(loop) - start_time;
103 
104     LOGF("%s_pump%d_client: %.1f gbit/s\n",
105          type == TCP ? "tcp" : "pipe",
106          write_sockets,
107          gbit(nsent_total, diff));
108 
109     for (i = 0; i < write_sockets; i++) {
110       if (type == TCP)
111         uv_close((uv_handle_t*) &tcp_write_handles[i], NULL);
112       else
113         uv_close((uv_handle_t*) &pipe_write_handles[i], NULL);
114     }
115 
116     exit(0);
117   }
118 
119   /* Reset read and write counters */
120   nrecv = 0;
121   nsent = 0;
122 }
123 
124 
read_show_stats(void)125 static void read_show_stats(void) {
126   int64_t diff;
127 
128   uv_update_time(loop);
129   diff = uv_now(loop) - start_time;
130 
131   LOGF("%s_pump%d_server: %.1f gbit/s\n",
132        type == TCP ? "tcp" : "pipe",
133        max_read_sockets,
134        gbit(nrecv_total, diff));
135 }
136 
137 
138 
read_sockets_close_cb(uv_handle_t * handle)139 static void read_sockets_close_cb(uv_handle_t* handle) {
140   free(handle);
141   read_sockets--;
142 
143   /* If it's past the first second and everyone has closed their connection
144    * Then print stats.
145    */
146   if (uv_now(loop) - start_time > 1000 && read_sockets == 0) {
147     read_show_stats();
148     uv_close((uv_handle_t*)server, NULL);
149   }
150 }
151 
152 
start_stats_collection(void)153 static void start_stats_collection(void) {
154   int r;
155 
156   /* Show-stats timer */
157   stats_left = STATS_COUNT;
158   r = uv_timer_init(loop, &timer_handle);
159   ASSERT(r == 0);
160   r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL);
161   ASSERT(r == 0);
162 
163   uv_update_time(loop);
164   start_time = uv_now(loop);
165 }
166 
167 
read_cb(uv_stream_t * stream,ssize_t bytes,const uv_buf_t * buf)168 static void read_cb(uv_stream_t* stream, ssize_t bytes, const uv_buf_t* buf) {
169   if (nrecv_total == 0) {
170     ASSERT(start_time == 0);
171     uv_update_time(loop);
172     start_time = uv_now(loop);
173   }
174 
175   if (bytes < 0) {
176     uv_close((uv_handle_t*)stream, read_sockets_close_cb);
177     return;
178   }
179 
180   buf_free(buf);
181 
182   nrecv += bytes;
183   nrecv_total += bytes;
184 }
185 
186 
write_cb(uv_write_t * req,int status)187 static void write_cb(uv_write_t* req, int status) {
188   ASSERT(status == 0);
189 
190   req_free((uv_req_t*) req);
191 
192   nsent += sizeof write_buffer;
193   nsent_total += sizeof write_buffer;
194 
195   do_write((uv_stream_t*) req->handle);
196 }
197 
198 
do_write(uv_stream_t * stream)199 static void do_write(uv_stream_t* stream) {
200   uv_write_t* req;
201   uv_buf_t buf;
202   int r;
203 
204   buf.base = (char*) &write_buffer;
205   buf.len = sizeof write_buffer;
206 
207   req = (uv_write_t*) req_alloc();
208   r = uv_write(req, stream, &buf, 1, write_cb);
209   ASSERT(r == 0);
210 }
211 
212 
connect_cb(uv_connect_t * req,int status)213 static void connect_cb(uv_connect_t* req, int status) {
214   int i;
215 
216   if (status) LOG(uv_strerror(status));
217   ASSERT(status == 0);
218 
219   write_sockets++;
220   req_free((uv_req_t*) req);
221 
222   maybe_connect_some();
223 
224   if (write_sockets == TARGET_CONNECTIONS) {
225     start_stats_collection();
226 
227     /* Yay! start writing */
228     for (i = 0; i < write_sockets; i++) {
229       if (type == TCP)
230         do_write((uv_stream_t*) &tcp_write_handles[i]);
231       else
232         do_write((uv_stream_t*) &pipe_write_handles[i]);
233     }
234   }
235 }
236 
237 
maybe_connect_some(void)238 static void maybe_connect_some(void) {
239   uv_connect_t* req;
240   uv_tcp_t* tcp;
241   uv_pipe_t* pipe;
242   int r;
243 
244   while (max_connect_socket < TARGET_CONNECTIONS &&
245          max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) {
246     if (type == TCP) {
247       tcp = &tcp_write_handles[max_connect_socket++];
248 
249       r = uv_tcp_init(loop, tcp);
250       ASSERT(r == 0);
251 
252       req = (uv_connect_t*) req_alloc();
253       r = uv_tcp_connect(req,
254                          tcp,
255                          (const struct sockaddr*) &connect_addr,
256                          connect_cb);
257       ASSERT(r == 0);
258     } else {
259       pipe = &pipe_write_handles[max_connect_socket++];
260 
261       r = uv_pipe_init(loop, pipe, 0);
262       ASSERT(r == 0);
263 
264       req = (uv_connect_t*) req_alloc();
265       uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
266     }
267   }
268 }
269 
270 
connection_cb(uv_stream_t * s,int status)271 static void connection_cb(uv_stream_t* s, int status) {
272   uv_stream_t* stream;
273   int r;
274 
275   ASSERT(server == s);
276   ASSERT(status == 0);
277 
278   if (type == TCP) {
279     stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
280     r = uv_tcp_init(loop, (uv_tcp_t*)stream);
281     ASSERT(r == 0);
282   } else {
283     stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t));
284     r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0);
285     ASSERT(r == 0);
286   }
287 
288   r = uv_accept(s, stream);
289   ASSERT(r == 0);
290 
291   r = uv_read_start(stream, buf_alloc, read_cb);
292   ASSERT(r == 0);
293 
294   read_sockets++;
295   max_read_sockets++;
296 }
297 
298 
299 /*
300  * Request allocator
301  */
302 
303 typedef struct req_list_s {
304   union uv_any_req uv_req;
305   struct req_list_s* next;
306 } req_list_t;
307 
308 
309 static req_list_t* req_freelist = NULL;
310 
311 
req_alloc(void)312 static uv_req_t* req_alloc(void) {
313   req_list_t* req;
314 
315   req = req_freelist;
316   if (req != NULL) {
317     req_freelist = req->next;
318     return (uv_req_t*) req;
319   }
320 
321   req = (req_list_t*) malloc(sizeof *req);
322   return (uv_req_t*) req;
323 }
324 
325 
req_free(uv_req_t * uv_req)326 static void req_free(uv_req_t* uv_req) {
327   req_list_t* req = (req_list_t*) uv_req;
328 
329   req->next = req_freelist;
330   req_freelist = req;
331 }
332 
333 
334 /*
335  * Buffer allocator
336  */
337 
338 typedef struct buf_list_s {
339   uv_buf_t uv_buf_t;
340   struct buf_list_s* next;
341 } buf_list_t;
342 
343 
344 static buf_list_t* buf_freelist = NULL;
345 
346 
buf_alloc(uv_handle_t * handle,size_t size,uv_buf_t * buf)347 static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
348   buf_list_t* ab;
349 
350   ab = buf_freelist;
351   if (ab != NULL)
352     buf_freelist = ab->next;
353   else {
354     ab = malloc(size + sizeof(*ab));
355     ab->uv_buf_t.len = size;
356     ab->uv_buf_t.base = (char*) (ab + 1);
357   }
358 
359   *buf = ab->uv_buf_t;
360 }
361 
362 
buf_free(const uv_buf_t * buf)363 static void buf_free(const uv_buf_t* buf) {
364   buf_list_t* ab = (buf_list_t*) buf->base - 1;
365   ab->next = buf_freelist;
366   buf_freelist = ab;
367 }
368 
369 
HELPER_IMPL(tcp_pump_server)370 HELPER_IMPL(tcp_pump_server) {
371   int r;
372 
373   type = TCP;
374   loop = uv_default_loop();
375 
376   ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &listen_addr));
377 
378   /* Server */
379   server = (uv_stream_t*)&tcpServer;
380   r = uv_tcp_init(loop, &tcpServer);
381   ASSERT(r == 0);
382   r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &listen_addr, 0);
383   ASSERT(r == 0);
384   r = uv_listen((uv_stream_t*)&tcpServer, MAX_WRITE_HANDLES, connection_cb);
385   ASSERT(r == 0);
386 
387   uv_run(loop, UV_RUN_DEFAULT);
388 
389   return 0;
390 }
391 
392 
HELPER_IMPL(pipe_pump_server)393 HELPER_IMPL(pipe_pump_server) {
394   int r;
395   type = PIPE;
396 
397   loop = uv_default_loop();
398 
399   /* Server */
400   server = (uv_stream_t*)&pipeServer;
401   r = uv_pipe_init(loop, &pipeServer, 0);
402   ASSERT(r == 0);
403   r = uv_pipe_bind(&pipeServer, TEST_PIPENAME);
404   ASSERT(r == 0);
405   r = uv_listen((uv_stream_t*)&pipeServer, MAX_WRITE_HANDLES, connection_cb);
406   ASSERT(r == 0);
407 
408   uv_run(loop, UV_RUN_DEFAULT);
409 
410   MAKE_VALGRIND_HAPPY();
411   return 0;
412 }
413 
414 
tcp_pump(int n)415 static void tcp_pump(int n) {
416   ASSERT(n <= MAX_WRITE_HANDLES);
417   TARGET_CONNECTIONS = n;
418   type = TCP;
419 
420   loop = uv_default_loop();
421 
422   ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &connect_addr));
423 
424   /* Start making connections */
425   maybe_connect_some();
426 
427   uv_run(loop, UV_RUN_DEFAULT);
428 
429   MAKE_VALGRIND_HAPPY();
430 }
431 
432 
pipe_pump(int n)433 static void pipe_pump(int n) {
434   ASSERT(n <= MAX_WRITE_HANDLES);
435   TARGET_CONNECTIONS = n;
436   type = PIPE;
437 
438   loop = uv_default_loop();
439 
440   /* Start making connections */
441   maybe_connect_some();
442 
443   uv_run(loop, UV_RUN_DEFAULT);
444 
445   MAKE_VALGRIND_HAPPY();
446 }
447 
448 
BENCHMARK_IMPL(tcp_pump100_client)449 BENCHMARK_IMPL(tcp_pump100_client) {
450   tcp_pump(100);
451   return 0;
452 }
453 
454 
BENCHMARK_IMPL(tcp_pump1_client)455 BENCHMARK_IMPL(tcp_pump1_client) {
456   tcp_pump(1);
457   return 0;
458 }
459 
460 
BENCHMARK_IMPL(pipe_pump100_client)461 BENCHMARK_IMPL(pipe_pump100_client) {
462   pipe_pump(100);
463   return 0;
464 }
465 
466 
BENCHMARK_IMPL(pipe_pump1_client)467 BENCHMARK_IMPL(pipe_pump1_client) {
468   pipe_pump(1);
469   return 0;
470 }
471