1 #include <sys/param.h> 2 #include <sys/event.h> 3 #include <sys/ioctl.h> 4 #include <sys/queue.h> 5 #include <sys/socket.h> 6 #include <sys/sysctl.h> 7 #include <sys/time.h> 8 9 #include <machine/atomic.h> 10 #ifdef __FreeBSD__ 11 #include <machine/cpu.h> 12 #endif 13 #include <machine/cpufunc.h> 14 15 #include <arpa/inet.h> 16 #include <netinet/in.h> 17 18 #include <err.h> 19 #include <errno.h> 20 #include <pthread.h> 21 #include <pthread_np.h> 22 #include <signal.h> 23 #include <stdbool.h> 24 #include <stdio.h> 25 #include <stdint.h> 26 #include <stdlib.h> 27 #include <string.h> 28 #include <unistd.h> 29 30 #include "kq_sendrecv_proto.h" 31 32 /* 33 * Note about the sender start synchronization. 34 * 35 * We apply two stage synchronization. The first stage uses pthread 36 * condition (it sleeps), which waits for the establishment for all 37 * connections, which could be slow. The second stage uses g_nwait 38 * of send_globctx; all relevant threads spin on g_nwait. The main 39 * thread spin-waits for all senders to increase g_nwait. The sender 40 * thread increases the g_nwait, then it spin-waits for main thread 41 * to reset g_nwait. In this way, we can make sure that all senders 42 * start roughly at the same time. 43 */ 44 45 #if 0 46 #define SEND_DEBUG 47 #endif 48 #if 0 49 #define SEND_TIME_DEBUG 50 #endif 51 52 #define SEND_DUR 10 53 #define SEND_EVENT_MAX 64 54 #define SEND_BUFLEN (128 * 1024) 55 56 /* 57 * The successful 3-way handshake on the connection does not mean the 58 * remote application can accept(2) this connection. Even worse, the 59 * remote side's network stack may drop the connection silently, i.e. 60 * w/o RST. If this happened, the blocking read(2) would not return, 61 * until the keepalive kicked in, which would take quite some time. 62 * This is obviously not what we want here, so use synthetic timeout 63 * for blocking read(2). Here, we will retry if a blocking read(2) 64 * times out. 65 */ 66 #define SEND_READTO_MS 1000 /* unit: ms */ 67 68 #if defined(__DragonFly__) 69 #define SEND_CONN_CTX_ALIGN __VM_CACHELINE_SIZE 70 #elif defined(__FreeBSD__) 71 #define SEND_CONN_CTX_ALIGN CACHE_LINE_SIZE 72 #else 73 #define SEND_CONN_CTX_ALIGN 64 /* XXX */ 74 #endif 75 76 struct conn_ctx { 77 int c_s; 78 int c_err; 79 uint64_t c_stat; 80 struct timespec c_terr; 81 82 STAILQ_ENTRY(conn_ctx) c_glob_link; 83 STAILQ_ENTRY(conn_ctx) c_link; 84 struct sockaddr_in c_in; 85 int c_thr_id; 86 } __aligned(SEND_CONN_CTX_ALIGN); 87 88 STAILQ_HEAD(conn_ctx_list, conn_ctx); 89 90 struct send_globctx { 91 struct conn_ctx_list g_conn; 92 93 int g_dur; 94 int g_nconn; 95 pthread_mutex_t g_lock; 96 pthread_cond_t g_cond; 97 98 volatile u_int g_nwait; 99 int g_readto_ms; /* unit: ms */ 100 int g_buflen; 101 bool g_sendfile; 102 }; 103 104 struct send_thrctx { 105 struct conn_ctx_list t_conn; 106 pthread_mutex_t t_lock; 107 pthread_cond_t t_cond; 108 109 struct send_globctx *t_glob; 110 struct timespec t_start; 111 struct timespec t_end; 112 double t_run_us; /* unit: us */ 113 114 pthread_t t_tid; 115 int t_id; 116 }; 117 118 static void send_build_addrlist(const struct sockaddr_in *, int, 119 const struct sockaddr_in **, int *, int); 120 static void *send_thread(void *); 121 122 static __inline void 123 send_spinwait(void) 124 { 125 #if defined(__DragonFly__) 126 cpu_pause(); 127 #elif defined(__FreeBSD__) 128 cpu_spinwait(); 129 #else 130 /* XXX nothing */ 131 #endif 132 } 133 134 static void 135 usage(const char *cmd) 136 { 137 fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] " 138 "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-S] [-E] " 139 "[-b buflen] [-B]\n", cmd); 140 exit(2); 141 } 142 143 int 144 main(int argc, char *argv[]) 145 { 146 struct send_globctx glob; 147 struct send_thrctx *ctx_arr, *ctx; 148 struct sockaddr_in *in_arr, *in; 149 const struct sockaddr_in *daddr; 150 struct timespec run, end, start; 151 double total_run_us, total, conn_min, conn_max; 152 double jain, jain_res; 153 int jain_cnt; 154 struct conn_ctx *conn; 155 sigset_t sigset; 156 int opt, i, ncpus; 157 int in_arr_cnt, in_arr_sz, ndaddr; 158 int nthr, nconn, dur, readto_ms, buflen; 159 int log_err, err_cnt, has_minmax; 160 u_short port = RECV_PORT; 161 uint32_t idx; 162 size_t sz; 163 bool do_sendfile = false, bindcpu = false; 164 165 sigemptyset(&sigset); 166 sigaddset(&sigset, SIGPIPE); 167 if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0) 168 err(1, "sigprocmask failed"); 169 170 sz = sizeof(ncpus); 171 if (sysctlbyname("hw.ncpu", &ncpus, &sz, NULL, 0) < 0) 172 err(1, "sysctl hw.ncpu failed"); 173 nthr = ncpus; 174 175 in_arr_sz = 4; 176 in_arr_cnt = 0; 177 in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in)); 178 if (in_arr == NULL) 179 err(1, "malloc failed"); 180 181 log_err = 0; 182 nconn = 0; 183 dur = SEND_DUR; 184 readto_ms = SEND_READTO_MS; 185 buflen = SEND_BUFLEN; 186 187 while ((opt = getopt(argc, argv, "4:BESb:c:l:p:r:t:")) != -1) { 188 switch (opt) { 189 case '4': 190 if (in_arr_cnt == in_arr_sz) { 191 in_arr_sz *= 2; 192 in_arr = reallocf(in_arr, 193 in_arr_sz * sizeof(struct sockaddr_in)); 194 if (in_arr == NULL) 195 err(1, "reallocf failed"); 196 } 197 in = &in_arr[in_arr_cnt]; 198 ++in_arr_cnt; 199 200 memset(in, 0, sizeof(*in)); 201 in->sin_family = AF_INET; 202 if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0) 203 errx(1, "inet_pton failed %s", optarg); 204 break; 205 206 case 'B': 207 bindcpu = true; 208 break; 209 210 case 'E': 211 log_err = 1; 212 break; 213 214 case 'S': 215 do_sendfile = true; 216 break; 217 218 case 'b': 219 buflen = strtol(optarg, NULL, 10); 220 if (buflen <= 0) 221 errx(1, "invalid -b"); 222 break; 223 224 case 'c': 225 nconn = strtol(optarg, NULL, 10); 226 if (nconn <= 0) 227 errx(1, "invalid -c"); 228 break; 229 230 case 'l': 231 dur = strtoul(optarg, NULL, 10); 232 if (dur == 0) 233 errx(1, "invalid -l"); 234 break; 235 236 case 'p': 237 port = strtoul(optarg, NULL, 10); 238 break; 239 240 case 'r': 241 readto_ms = strtol(optarg, NULL, 10); 242 if (readto_ms <= 0) 243 errx(1, "invalid -r"); 244 break; 245 246 case 't': 247 nthr = strtol(optarg, NULL, 10); 248 if (nthr <= 0) 249 errx(1, "invalid -t"); 250 break; 251 252 default: 253 usage(argv[0]); 254 } 255 } 256 if (in_arr_cnt == 0 || nconn == 0) 257 errx(1, "either -4 or -c are specified"); 258 259 if (nthr > nconn) 260 nthr = nconn; 261 262 for (i = 0; i < in_arr_cnt; ++i) 263 in_arr[i].sin_port = htons(port); 264 265 ctx_arr = calloc(nthr, sizeof(struct send_thrctx)); 266 if (ctx_arr == NULL) 267 err(1, "calloc failed"); 268 269 memset(&glob, 0, sizeof(glob)); 270 STAILQ_INIT(&glob.g_conn); 271 glob.g_nconn = nconn; 272 glob.g_nwait = 1; /* count self in */ 273 glob.g_dur = dur; 274 glob.g_readto_ms = readto_ms; 275 glob.g_sendfile = do_sendfile; 276 glob.g_buflen = buflen; 277 pthread_mutex_init(&glob.g_lock, NULL); 278 pthread_cond_init(&glob.g_cond, NULL); 279 280 pthread_set_name_np(pthread_self(), "main"); 281 282 /* Build receiver address list */ 283 send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms); 284 285 /* 286 * Start senders. 287 */ 288 for (i = 0; i < nthr; ++i) { 289 pthread_attr_t attr; 290 int error; 291 292 ctx = &ctx_arr[i]; 293 STAILQ_INIT(&ctx->t_conn); 294 ctx->t_id = i; 295 ctx->t_glob = &glob; 296 pthread_mutex_init(&ctx->t_lock, NULL); 297 pthread_cond_init(&ctx->t_cond, NULL); 298 299 pthread_attr_init(&attr); 300 if (bindcpu) { 301 #ifdef __FreeBSD__ 302 cpuset_t mask; 303 #else 304 cpu_set_t mask; 305 #endif 306 307 CPU_ZERO(&mask); 308 CPU_SET(i % ncpus, &mask); 309 error = pthread_attr_setaffinity_np(&attr, 310 sizeof(mask), &mask); 311 if (error) { 312 errc(1, error, "pthread_attr_setaffinity_np " 313 "failed"); 314 } 315 } 316 317 error = pthread_create(&ctx->t_tid, &attr, send_thread, ctx); 318 if (error) 319 errc(1, error, "pthread_create failed"); 320 pthread_attr_destroy(&attr); 321 } 322 323 /* 324 * Distribute connections to senders. 325 * 326 * NOTE: 327 * We start from a random position in the address list, so that the 328 * first several receiving servers will not be abused, if the number 329 * of connections is small and there are many clients. 330 */ 331 idx = arc4random_uniform(ndaddr); 332 for (i = 0; i < nconn; ++i) { 333 const struct sockaddr_in *da; 334 335 da = &daddr[idx % ndaddr]; 336 ++idx; 337 338 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn)); 339 if (conn == NULL) 340 err(1, "aligned_alloc failed"); 341 memset(conn, 0, sizeof(*conn)); 342 conn->c_in = *da; 343 conn->c_s = -1; 344 345 ctx = &ctx_arr[i % nthr]; 346 conn->c_thr_id = ctx->t_id; 347 348 pthread_mutex_lock(&ctx->t_lock); 349 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link); 350 pthread_mutex_unlock(&ctx->t_lock); 351 pthread_cond_signal(&ctx->t_cond); 352 353 /* Add to the global list for results gathering */ 354 STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link); 355 } 356 357 /* 358 * No more connections; notify the senders. 359 * 360 * NOTE: 361 * The marker for 'the end of connection list' has 0 in its 362 * c_in.sin_port. 363 */ 364 for (i = 0; i < nthr; ++i) { 365 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn)); 366 if (conn == NULL) 367 err(1, "aligned_alloc failed"); 368 memset(conn, 0, sizeof(*conn)); 369 conn->c_s = -1; 370 371 ctx = &ctx_arr[i]; 372 pthread_mutex_lock(&ctx->t_lock); 373 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link); 374 pthread_mutex_unlock(&ctx->t_lock); 375 pthread_cond_signal(&ctx->t_cond); 376 } 377 378 /* 379 * Sender start sync, stage 1: 380 * Wait for connections establishment (slow). 381 */ 382 pthread_mutex_lock(&glob.g_lock); 383 while (glob.g_nconn != 0) 384 pthread_cond_wait(&glob.g_cond, &glob.g_lock); 385 pthread_mutex_unlock(&glob.g_lock); 386 387 /* 388 * Sender start sync, stage 2: 389 * Wait for senders to spin-wait; and once all senders spin-wait, 390 * release them by resetting g_nwait. 391 */ 392 while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0) 393 send_spinwait(); 394 395 fprintf(stderr, "start %d seconds sending test: %d threads, " 396 "%d connections\n", dur, nthr, nconn); 397 398 /* 399 * Wait for the senders to finish and gather the results. 400 */ 401 402 memset(&end, 0, sizeof(end)); /* XXX stupid gcc warning */ 403 memset(&start, 0, sizeof(start)); /* XXX stupid gcc warning */ 404 405 for (i = 0; i < nthr; ++i) { 406 ctx = &ctx_arr[i]; 407 pthread_join(ctx->t_tid, NULL); 408 409 timespecsub(&ctx->t_end, &ctx->t_start, &run); 410 ctx->t_run_us = ((double)run.tv_sec * 1000000.0) + 411 ((double)run.tv_nsec / 1000.0); 412 413 if (i == 0) { 414 start = ctx->t_start; 415 end = ctx->t_end; 416 } else { 417 if (timespeccmp(&start, &ctx->t_start, >)) 418 start = ctx->t_start; 419 if (timespeccmp(&end, &ctx->t_end, <)) 420 end = ctx->t_end; 421 } 422 423 #ifdef SEND_TIME_DEBUG 424 fprintf(stderr, "start %ld.%ld, end %ld.%ld\n", 425 ctx->t_start.tv_sec, ctx->t_start.tv_nsec, 426 ctx->t_end.tv_sec, ctx->t_end.tv_nsec); 427 #endif 428 } 429 430 #ifdef SEND_TIME_DEBUG 431 fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n", 432 start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec); 433 #endif 434 435 timespecsub(&end, &start, &run); 436 total_run_us = ((double)run.tv_sec * 1000000.0) + 437 ((double)run.tv_nsec / 1000.0); 438 total = 0.0; 439 440 err_cnt = 0; 441 has_minmax = 0; 442 conn_min = 0.0; 443 conn_max = 0.0; 444 445 jain = 0.0; 446 jain_res = 0.0; 447 jain_cnt = 0; 448 449 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) { 450 total += conn->c_stat; 451 if (conn->c_err == 0) { 452 double perf; /* unit: Mbps */ 453 454 perf = (conn->c_stat * 8.0) / 455 ctx_arr[conn->c_thr_id].t_run_us; 456 if (!has_minmax) { 457 conn_min = perf; 458 conn_max = perf; 459 has_minmax = 1; 460 } else { 461 if (perf > conn_max) 462 conn_max = perf; 463 if (perf < conn_min) 464 conn_min = perf; 465 } 466 jain += (perf * perf); 467 jain_res += perf; 468 ++jain_cnt; 469 } else { 470 ++err_cnt; 471 } 472 } 473 474 jain *= jain_cnt; 475 jain = (jain_res * jain_res) / jain; 476 477 printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, " 478 "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max, 479 jain, err_cnt); 480 481 if (log_err && err_cnt) { 482 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) { 483 char name[INET_ADDRSTRLEN]; 484 double tmp_run; 485 486 if (conn->c_err == 0) 487 continue; 488 489 run = conn->c_terr; 490 timespecsub(&conn->c_terr, 491 &ctx_arr[conn->c_thr_id].t_start, &run); 492 tmp_run = ((double)run.tv_sec * 1000000.0) + 493 ((double)run.tv_nsec / 1000.0); 494 fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, " 495 "errno %d\n", 496 conn->c_thr_id, 497 inet_ntop(AF_INET, &conn->c_in.sin_addr, 498 name, sizeof(name)), 499 ntohs(conn->c_in.sin_port), 500 run.tv_sec, (conn->c_stat * 8.0) / tmp_run, 501 conn->c_err); 502 --err_cnt; 503 if (err_cnt == 0) 504 break; 505 } 506 } 507 508 exit(0); 509 } 510 511 static void 512 send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt, 513 const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms) 514 { 515 struct sockaddr_in *daddr; 516 struct timeval readto; 517 int i, ndaddr; 518 519 daddr = NULL; 520 ndaddr = 0; 521 522 memset(&readto, 0, sizeof(readto)); 523 readto.tv_sec = readto_ms / 1000; 524 readto.tv_usec = (readto_ms % 1000) * 1000; 525 526 for (i = 0; i < in_arr_cnt; ++i) { 527 const struct sockaddr_in *in = &in_arr[i]; 528 struct recv_info info_hdr; 529 uint16_t *ports; 530 int s, n, ports_sz, d; 531 532 again: 533 s = socket(AF_INET, SOCK_STREAM, 0); 534 if (s < 0) 535 err(1, "socket failed"); 536 537 if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0) 538 err(1, "connect failed"); 539 540 if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, 541 &readto, sizeof(readto)) < 0) 542 err(1, "setsockopt(RCVTIMEO) failed"); 543 544 n = read(s, &info_hdr, sizeof(info_hdr)); 545 if (n != sizeof(info_hdr)) { 546 if (n < 0) { 547 if (errno == EAGAIN) { 548 close(s); 549 goto again; 550 } 551 err(1, "read info hdr failed"); 552 } else { 553 errx(1, "read truncated info hdr"); 554 } 555 } 556 if (info_hdr.ndport == 0) { 557 close(s); 558 continue; 559 } 560 561 ports_sz = info_hdr.ndport * sizeof(uint16_t); 562 ports = malloc(ports_sz); 563 if (ports == NULL) 564 err(1, "malloc failed"); 565 566 n = read(s, ports, ports_sz); 567 if (n != ports_sz) { 568 if (n < 0) { 569 if (errno == EAGAIN) { 570 free(ports); 571 close(s); 572 goto again; 573 } 574 err(1, "read ports failed"); 575 } else { 576 errx(1, "read truncated ports"); 577 } 578 } 579 580 daddr = reallocf(daddr, 581 (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in)); 582 if (daddr == NULL) 583 err(1, "reallocf failed"); 584 585 for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) { 586 struct sockaddr_in *da = &daddr[d]; 587 588 *da = *in; 589 da->sin_port = ports[d - ndaddr]; 590 } 591 ndaddr += info_hdr.ndport; 592 593 free(ports); 594 close(s); 595 } 596 597 #ifdef SEND_DEBUG 598 for (i = 0; i < ndaddr; ++i) { 599 const struct sockaddr_in *da = &daddr[i]; 600 char name[INET_ADDRSTRLEN]; 601 602 fprintf(stderr, "%s:%d\n", 603 inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)), 604 ntohs(da->sin_port)); 605 } 606 #endif 607 608 *daddr0 = daddr; 609 *ndaddr0 = ndaddr; 610 } 611 612 static void * 613 send_thread(void *xctx) 614 { 615 struct send_thrctx *ctx = xctx; 616 struct conn_ctx *timeo; 617 struct kevent chg_evt; 618 uint8_t *buf; 619 int nconn = 0, kq, n, fd = -1, buflen; 620 char name[32]; 621 622 snprintf(name, sizeof(name), "snd%d", ctx->t_id); 623 pthread_set_name_np(pthread_self(), name); 624 625 buflen = ctx->t_glob->g_buflen; 626 buf = malloc(buflen); 627 if (buf == NULL) 628 err(1, "malloc(%d) failed", buflen); 629 630 if (ctx->t_glob->g_sendfile) { 631 char filename[] = "sendtmpXXX"; 632 633 fd = mkstemp(filename); 634 if (fd < 0) 635 err(1, "mkstemp failed"); 636 if (write(fd, buf, buflen) != buflen) 637 err(1, "write to file failed"); 638 unlink(filename); 639 free(buf); 640 buf = NULL; 641 } 642 643 kq = kqueue(); 644 if (kq < 0) 645 err(1, "kqueue failed"); 646 647 /* 648 * Establish the connections assigned to us and add the 649 * established connections to kqueue. 650 */ 651 for (;;) { 652 #ifdef SEND_DEBUG 653 char addr_name[INET_ADDRSTRLEN]; 654 #endif 655 struct timeval readto; 656 struct conn_ctx *conn; 657 struct conn_ack ack; 658 int on; 659 660 pthread_mutex_lock(&ctx->t_lock); 661 while (STAILQ_EMPTY(&ctx->t_conn)) 662 pthread_cond_wait(&ctx->t_cond, &ctx->t_lock); 663 conn = STAILQ_FIRST(&ctx->t_conn); 664 STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link); 665 pthread_mutex_unlock(&ctx->t_lock); 666 667 if (conn->c_in.sin_port == 0) { 668 /* 669 * The marker for 'the end of connection list'. 670 * See the related comment in main thread. 671 * 672 * NOTE: 673 * We reuse the marker as the udata for the 674 * kqueue timer. 675 */ 676 timeo = conn; 677 break; 678 } 679 680 ++nconn; 681 #ifdef SEND_DEBUG 682 fprintf(stderr, "%s %s:%d\n", name, 683 inet_ntop(AF_INET, &conn->c_in.sin_addr, 684 addr_name, sizeof(addr_name)), 685 ntohs(conn->c_in.sin_port)); 686 #endif 687 688 again: 689 conn->c_s = socket(AF_INET, SOCK_STREAM, 0); 690 if (conn->c_s < 0) 691 err(1, "socket failed"); 692 693 if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in, 694 sizeof(conn->c_in)) < 0) 695 err(1, "connect failed"); 696 697 memset(&readto, 0, sizeof(readto)); 698 readto.tv_sec = ctx->t_glob->g_readto_ms / 1000; 699 readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000; 700 if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto, 701 sizeof(readto)) < 0) 702 err(1, "setsockopt(RCVTIMEO) failed"); 703 704 n = read(conn->c_s, &ack, sizeof(ack)); 705 if (n != sizeof(ack)) { 706 if (n < 0) { 707 if (errno == EAGAIN) { 708 close(conn->c_s); 709 goto again; 710 } 711 err(1, "read ack failed"); 712 } else { 713 errx(1, "read truncated ack"); 714 } 715 } 716 717 on = 1; 718 if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0) 719 err(1, "ioctl(FIONBIO) failed"); 720 721 EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn); 722 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL); 723 if (n < 0) 724 err(1, "kevent add failed"); 725 } 726 #ifdef SEND_DEBUG 727 fprintf(stderr, "%s conn %d\n", name, nconn); 728 #endif 729 730 /* 731 * Sender start sync, stage 1: 732 * Wait for connections establishment (slow). 733 */ 734 pthread_mutex_lock(&ctx->t_glob->g_lock); 735 ctx->t_glob->g_nconn -= nconn; 736 pthread_cond_broadcast(&ctx->t_glob->g_cond); 737 while (ctx->t_glob->g_nconn != 0) 738 pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock); 739 pthread_mutex_unlock(&ctx->t_glob->g_lock); 740 741 /* 742 * Sender start sync, stage2. 743 */ 744 /* Increase the g_nwait. */ 745 atomic_add_int(&ctx->t_glob->g_nwait, 1); 746 /* Spin-wait for main thread to release us (reset g_nwait). */ 747 while (ctx->t_glob->g_nwait) 748 send_spinwait(); 749 750 #ifdef SEND_DEBUG 751 fprintf(stderr, "%s start\n", name); 752 #endif 753 754 /* 755 * Wire a kqueue timer, so that the sending can be terminated 756 * as requested. 757 * 758 * NOTE: 759 * Set -2 to c_s for timer udata, so we could distinguish it 760 * from real connections. 761 */ 762 timeo->c_s = -2; 763 EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0, 764 ctx->t_glob->g_dur * 1000L, timeo); 765 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL); 766 if (n < 0) 767 err(1, "kevent add failed"); 768 769 clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start); 770 for (;;) { 771 struct kevent evt[SEND_EVENT_MAX]; 772 int nevt, i; 773 774 nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL); 775 if (nevt < 0) 776 err(1, "kevent failed"); 777 778 for (i = 0; i < nevt; ++i) { 779 struct conn_ctx *conn = evt[i].udata; 780 781 if (conn->c_s < 0) { 782 if (conn->c_s == -2) { 783 /* Timer expired */ 784 goto done; 785 } 786 continue; 787 } 788 789 if (fd >= 0) { 790 off_t m, off; 791 size_t len; 792 793 off = conn->c_stat % buflen; 794 len = buflen - off; 795 796 n = sendfile(fd, conn->c_s, off, len, NULL, 797 &m, 0); 798 if (n == 0 || (n < 0 && errno == EAGAIN)) 799 n = m; 800 } else { 801 n = write(conn->c_s, buf, buflen); 802 } 803 804 if (n < 0) { 805 if (errno != EAGAIN) { 806 conn->c_err = errno; 807 clock_gettime(CLOCK_MONOTONIC_PRECISE, 808 &conn->c_terr); 809 close(conn->c_s); 810 conn->c_s = -1; 811 } 812 } else { 813 conn->c_stat += n; 814 } 815 } 816 } 817 done: 818 clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end); 819 820 if (fd >= 0) 821 close(fd); 822 if (buf != NULL) 823 free(buf); 824 return NULL; 825 } 826