1 #include <sys/param.h> 2 #include <sys/event.h> 3 #include <sys/ioctl.h> 4 #include <sys/queue.h> 5 #include <sys/socket.h> 6 #include <sys/sysctl.h> 7 #include <sys/time.h> 8 9 #include <machine/atomic.h> 10 #ifdef __FreeBSD__ 11 #include <machine/cpu.h> 12 #endif 13 #include <machine/cpufunc.h> 14 15 #include <arpa/inet.h> 16 #include <netinet/in.h> 17 18 #include <err.h> 19 #include <errno.h> 20 #include <pthread.h> 21 #include <pthread_np.h> 22 #include <signal.h> 23 #include <stdbool.h> 24 #include <stdio.h> 25 #include <stdint.h> 26 #include <stdlib.h> 27 #include <string.h> 28 #include <unistd.h> 29 30 #include "kq_sendrecv_proto.h" 31 32 /* 33 * Note about the sender start synchronization. 34 * 35 * We apply two stage synchronization. The first stage uses pthread 36 * condition (it sleeps), which waits for the establishment for all 37 * connections, which could be slow. The second stage uses g_nwait 38 * of send_globctx; all relevant threads spin on g_nwait. The main 39 * thread spin-waits for all senders to increase g_nwait. The sender 40 * thread increases the g_nwait, then it spin-waits for main thread 41 * to reset g_nwait. In this way, we can make sure that all senders 42 * start roughly at the same time. 43 */ 44 45 #ifndef timespecsub 46 #define timespecsub(vvp, uvp) \ 47 do { \ 48 (vvp)->tv_sec -= (uvp)->tv_sec; \ 49 (vvp)->tv_nsec -= (uvp)->tv_nsec; \ 50 if ((vvp)->tv_nsec < 0) { \ 51 (vvp)->tv_sec--; \ 52 (vvp)->tv_nsec += 1000000000; \ 53 } \ 54 } while (0) 55 #endif 56 57 #ifndef timespeccmp 58 #define timespeccmp(tvp, uvp, cmp) \ 59 (((tvp)->tv_sec == (uvp)->tv_sec) ? \ 60 ((tvp)->tv_nsec cmp (uvp)->tv_nsec) : \ 61 ((tvp)->tv_sec cmp (uvp)->tv_sec)) 62 #endif 63 64 #if 0 65 #define SEND_DEBUG 66 #endif 67 #if 0 68 #define SEND_TIME_DEBUG 69 #endif 70 71 #define SEND_DUR 10 72 #define SEND_EVENT_MAX 64 73 #define SEND_BUFLEN (128 * 1024) 74 75 /* 76 * The successful 3-way handshake on the connection does not mean the 77 * remote application can accept(2) this connection. Even worse, the 78 * remote side's network stack may drop the connection silently, i.e. 79 * w/o RST. If this happened, the blocking read(2) would not return, 80 * until the keepalive kicked in, which would take quite some time. 81 * This is obviously not what we want here, so use synthetic timeout 82 * for blocking read(2). Here, we will retry if a blocking read(2) 83 * times out. 84 */ 85 #define SEND_READTO_MS 1000 /* unit: ms */ 86 87 #if defined(__DragonFly__) 88 #define SEND_CONN_CTX_ALIGN __VM_CACHELINE_SIZE 89 #elif defined(__FreeBSD__) 90 #define SEND_CONN_CTX_ALIGN CACHE_LINE_SIZE 91 #else 92 #define SEND_CONN_CTX_ALIGN 64 /* XXX */ 93 #endif 94 95 struct conn_ctx { 96 int c_s; 97 int c_err; 98 uint64_t c_stat; 99 struct timespec c_terr; 100 101 STAILQ_ENTRY(conn_ctx) c_glob_link; 102 STAILQ_ENTRY(conn_ctx) c_link; 103 struct sockaddr_in c_in; 104 int c_thr_id; 105 } __aligned(SEND_CONN_CTX_ALIGN); 106 107 STAILQ_HEAD(conn_ctx_list, conn_ctx); 108 109 struct send_globctx { 110 struct conn_ctx_list g_conn; 111 112 int g_dur; 113 int g_nconn; 114 pthread_mutex_t g_lock; 115 pthread_cond_t g_cond; 116 117 volatile u_int g_nwait; 118 int g_readto_ms; /* unit: ms */ 119 int g_buflen; 120 bool g_sendfile; 121 }; 122 123 struct send_thrctx { 124 struct conn_ctx_list t_conn; 125 pthread_mutex_t t_lock; 126 pthread_cond_t t_cond; 127 128 struct send_globctx *t_glob; 129 struct timespec t_start; 130 struct timespec t_end; 131 double t_run_us; /* unit: us */ 132 133 pthread_t t_tid; 134 int t_id; 135 }; 136 137 static void send_build_addrlist(const struct sockaddr_in *, int, 138 const struct sockaddr_in **, int *, int); 139 static void *send_thread(void *); 140 141 static __inline void 142 send_spinwait(void) 143 { 144 #if defined(__DragonFly__) 145 cpu_pause(); 146 #elif defined(__FreeBSD__) 147 cpu_spinwait(); 148 #else 149 /* XXX nothing */ 150 #endif 151 } 152 153 static void 154 usage(const char *cmd) 155 { 156 fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] " 157 "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-S] [-E] " 158 "[-b buflen] [-B]\n", cmd); 159 exit(2); 160 } 161 162 int 163 main(int argc, char *argv[]) 164 { 165 struct send_globctx glob; 166 struct send_thrctx *ctx_arr, *ctx; 167 struct sockaddr_in *in_arr, *in; 168 const struct sockaddr_in *daddr; 169 struct timespec run, end, start; 170 double total_run_us, total, conn_min, conn_max; 171 double jain, jain_res; 172 int jain_cnt; 173 struct conn_ctx *conn; 174 sigset_t sigset; 175 int opt, i, ncpus; 176 int in_arr_cnt, in_arr_sz, ndaddr; 177 int nthr, nconn, dur, readto_ms, buflen; 178 int log_err, err_cnt, has_minmax; 179 u_short port = RECV_PORT; 180 uint32_t idx; 181 size_t sz; 182 bool do_sendfile = false, bindcpu = false; 183 184 sigemptyset(&sigset); 185 sigaddset(&sigset, SIGPIPE); 186 if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0) 187 err(1, "sigprocmask failed"); 188 189 sz = sizeof(ncpus); 190 if (sysctlbyname("hw.ncpu", &ncpus, &sz, NULL, 0) < 0) 191 err(1, "sysctl hw.ncpu failed"); 192 nthr = ncpus; 193 194 in_arr_sz = 4; 195 in_arr_cnt = 0; 196 in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in)); 197 if (in_arr == NULL) 198 err(1, "malloc failed"); 199 200 log_err = 0; 201 nconn = 0; 202 dur = SEND_DUR; 203 readto_ms = SEND_READTO_MS; 204 buflen = SEND_BUFLEN; 205 206 while ((opt = getopt(argc, argv, "4:BESb:c:l:p:r:t:")) != -1) { 207 switch (opt) { 208 case '4': 209 if (in_arr_cnt == in_arr_sz) { 210 in_arr_sz *= 2; 211 in_arr = reallocf(in_arr, 212 in_arr_sz * sizeof(struct sockaddr_in)); 213 if (in_arr == NULL) 214 err(1, "reallocf failed"); 215 } 216 in = &in_arr[in_arr_cnt]; 217 ++in_arr_cnt; 218 219 memset(in, 0, sizeof(*in)); 220 in->sin_family = AF_INET; 221 if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0) 222 errx(1, "inet_pton failed %s", optarg); 223 break; 224 225 case 'B': 226 bindcpu = true; 227 break; 228 229 case 'E': 230 log_err = 1; 231 break; 232 233 case 'S': 234 do_sendfile = true; 235 break; 236 237 case 'b': 238 buflen = strtol(optarg, NULL, 10); 239 if (buflen <= 0) 240 errx(1, "invalid -b"); 241 break; 242 243 case 'c': 244 nconn = strtol(optarg, NULL, 10); 245 if (nconn <= 0) 246 errx(1, "invalid -c"); 247 break; 248 249 case 'l': 250 dur = strtoul(optarg, NULL, 10); 251 if (dur == 0) 252 errx(1, "invalid -l"); 253 break; 254 255 case 'p': 256 port = strtoul(optarg, NULL, 10); 257 break; 258 259 case 'r': 260 readto_ms = strtol(optarg, NULL, 10); 261 if (readto_ms <= 0) 262 errx(1, "invalid -r"); 263 break; 264 265 case 't': 266 nthr = strtol(optarg, NULL, 10); 267 if (nthr <= 0) 268 errx(1, "invalid -t"); 269 break; 270 271 default: 272 usage(argv[0]); 273 } 274 } 275 if (in_arr_cnt == 0 || nconn == 0) 276 errx(1, "either -4 or -c are specified"); 277 278 if (nthr > nconn) 279 nthr = nconn; 280 281 for (i = 0; i < in_arr_cnt; ++i) 282 in_arr[i].sin_port = htons(port); 283 284 ctx_arr = calloc(nthr, sizeof(struct send_thrctx)); 285 if (ctx_arr == NULL) 286 err(1, "calloc failed"); 287 288 memset(&glob, 0, sizeof(glob)); 289 STAILQ_INIT(&glob.g_conn); 290 glob.g_nconn = nconn; 291 glob.g_nwait = 1; /* count self in */ 292 glob.g_dur = dur; 293 glob.g_readto_ms = readto_ms; 294 glob.g_sendfile = do_sendfile; 295 glob.g_buflen = buflen; 296 pthread_mutex_init(&glob.g_lock, NULL); 297 pthread_cond_init(&glob.g_cond, NULL); 298 299 pthread_set_name_np(pthread_self(), "main"); 300 301 /* Build receiver address list */ 302 send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms); 303 304 /* 305 * Start senders. 306 */ 307 for (i = 0; i < nthr; ++i) { 308 pthread_attr_t attr; 309 int error; 310 311 ctx = &ctx_arr[i]; 312 STAILQ_INIT(&ctx->t_conn); 313 ctx->t_id = i; 314 ctx->t_glob = &glob; 315 pthread_mutex_init(&ctx->t_lock, NULL); 316 pthread_cond_init(&ctx->t_cond, NULL); 317 318 pthread_attr_init(&attr); 319 if (bindcpu) { 320 #ifdef __FreeBSD__ 321 cpuset_t mask; 322 #else 323 cpu_set_t mask; 324 #endif 325 326 CPU_ZERO(&mask); 327 CPU_SET(i % ncpus, &mask); 328 error = pthread_attr_setaffinity_np(&attr, 329 sizeof(mask), &mask); 330 if (error) { 331 errc(1, error, "pthread_attr_setaffinity_np " 332 "failed"); 333 } 334 } 335 336 error = pthread_create(&ctx->t_tid, &attr, send_thread, ctx); 337 if (error) 338 errc(1, error, "pthread_create failed"); 339 pthread_attr_destroy(&attr); 340 } 341 342 /* 343 * Distribute connections to senders. 344 * 345 * NOTE: 346 * We start from a random position in the address list, so that the 347 * first several receiving servers will not be abused, if the number 348 * of connections is small and there are many clients. 349 */ 350 idx = arc4random_uniform(ndaddr); 351 for (i = 0; i < nconn; ++i) { 352 const struct sockaddr_in *da; 353 354 da = &daddr[idx % ndaddr]; 355 ++idx; 356 357 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn)); 358 if (conn == NULL) 359 err(1, "aligned_alloc failed"); 360 memset(conn, 0, sizeof(*conn)); 361 conn->c_in = *da; 362 conn->c_s = -1; 363 364 ctx = &ctx_arr[i % nthr]; 365 conn->c_thr_id = ctx->t_id; 366 367 pthread_mutex_lock(&ctx->t_lock); 368 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link); 369 pthread_mutex_unlock(&ctx->t_lock); 370 pthread_cond_signal(&ctx->t_cond); 371 372 /* Add to the global list for results gathering */ 373 STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link); 374 } 375 376 /* 377 * No more connections; notify the senders. 378 * 379 * NOTE: 380 * The marker for 'the end of connection list' has 0 in its 381 * c_in.sin_port. 382 */ 383 for (i = 0; i < nthr; ++i) { 384 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn)); 385 if (conn == NULL) 386 err(1, "aligned_alloc failed"); 387 memset(conn, 0, sizeof(*conn)); 388 conn->c_s = -1; 389 390 ctx = &ctx_arr[i]; 391 pthread_mutex_lock(&ctx->t_lock); 392 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link); 393 pthread_mutex_unlock(&ctx->t_lock); 394 pthread_cond_signal(&ctx->t_cond); 395 } 396 397 /* 398 * Sender start sync, stage 1: 399 * Wait for connections establishment (slow). 400 */ 401 pthread_mutex_lock(&glob.g_lock); 402 while (glob.g_nconn != 0) 403 pthread_cond_wait(&glob.g_cond, &glob.g_lock); 404 pthread_mutex_unlock(&glob.g_lock); 405 406 /* 407 * Sender start sync, stage 2: 408 * Wait for senders to spin-wait; and once all senders spin-wait, 409 * release them by resetting g_nwait. 410 */ 411 while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0) 412 send_spinwait(); 413 414 fprintf(stderr, "start %d seconds sending test: %d threads, " 415 "%d connections\n", dur, nthr, nconn); 416 417 /* 418 * Wait for the senders to finish and gather the results. 419 */ 420 421 memset(&end, 0, sizeof(end)); /* XXX stupid gcc warning */ 422 memset(&start, 0, sizeof(start)); /* XXX stupid gcc warning */ 423 424 for (i = 0; i < nthr; ++i) { 425 ctx = &ctx_arr[i]; 426 pthread_join(ctx->t_tid, NULL); 427 428 run = ctx->t_end; 429 timespecsub(&run, &ctx->t_start); 430 ctx->t_run_us = ((double)run.tv_sec * 1000000.0) + 431 ((double)run.tv_nsec / 1000.0); 432 433 if (i == 0) { 434 start = ctx->t_start; 435 end = ctx->t_end; 436 } else { 437 if (timespeccmp(&start, &ctx->t_start, >)) 438 start = ctx->t_start; 439 if (timespeccmp(&end, &ctx->t_end, <)) 440 end = ctx->t_end; 441 } 442 443 #ifdef SEND_TIME_DEBUG 444 fprintf(stderr, "start %ld.%ld, end %ld.%ld\n", 445 ctx->t_start.tv_sec, ctx->t_start.tv_nsec, 446 ctx->t_end.tv_sec, ctx->t_end.tv_nsec); 447 #endif 448 } 449 450 #ifdef SEND_TIME_DEBUG 451 fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n", 452 start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec); 453 #endif 454 455 run = end; 456 timespecsub(&run, &start); 457 total_run_us = ((double)run.tv_sec * 1000000.0) + 458 ((double)run.tv_nsec / 1000.0); 459 total = 0.0; 460 461 err_cnt = 0; 462 has_minmax = 0; 463 conn_min = 0.0; 464 conn_max = 0.0; 465 466 jain = 0.0; 467 jain_res = 0.0; 468 jain_cnt = 0; 469 470 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) { 471 total += conn->c_stat; 472 if (conn->c_err == 0) { 473 double perf; /* unit: Mbps */ 474 475 perf = (conn->c_stat * 8.0) / 476 ctx_arr[conn->c_thr_id].t_run_us; 477 if (!has_minmax) { 478 conn_min = perf; 479 conn_max = perf; 480 has_minmax = 1; 481 } else { 482 if (perf > conn_max) 483 conn_max = perf; 484 if (perf < conn_min) 485 conn_min = perf; 486 } 487 jain += (perf * perf); 488 jain_res += perf; 489 ++jain_cnt; 490 } else { 491 ++err_cnt; 492 } 493 } 494 495 jain *= jain_cnt; 496 jain = (jain_res * jain_res) / jain; 497 498 printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, " 499 "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max, 500 jain, err_cnt); 501 502 if (log_err && err_cnt) { 503 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) { 504 char name[INET_ADDRSTRLEN]; 505 double tmp_run; 506 507 if (conn->c_err == 0) 508 continue; 509 510 run = conn->c_terr; 511 timespecsub(&run, &ctx_arr[conn->c_thr_id].t_start); 512 tmp_run = ((double)run.tv_sec * 1000000.0) + 513 ((double)run.tv_nsec / 1000.0); 514 fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, " 515 "errno %d\n", 516 conn->c_thr_id, 517 inet_ntop(AF_INET, &conn->c_in.sin_addr, 518 name, sizeof(name)), 519 ntohs(conn->c_in.sin_port), 520 run.tv_sec, (conn->c_stat * 8.0) / tmp_run, 521 conn->c_err); 522 --err_cnt; 523 if (err_cnt == 0) 524 break; 525 } 526 } 527 528 exit(0); 529 } 530 531 static void 532 send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt, 533 const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms) 534 { 535 struct sockaddr_in *daddr; 536 struct timeval readto; 537 int i, ndaddr; 538 539 daddr = NULL; 540 ndaddr = 0; 541 542 memset(&readto, 0, sizeof(readto)); 543 readto.tv_sec = readto_ms / 1000; 544 readto.tv_usec = (readto_ms % 1000) * 1000; 545 546 for (i = 0; i < in_arr_cnt; ++i) { 547 const struct sockaddr_in *in = &in_arr[i]; 548 struct recv_info info_hdr; 549 uint16_t *ports; 550 int s, n, ports_sz, d; 551 552 again: 553 s = socket(AF_INET, SOCK_STREAM, 0); 554 if (s < 0) 555 err(1, "socket failed"); 556 557 if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0) 558 err(1, "connect failed"); 559 560 if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, 561 &readto, sizeof(readto)) < 0) 562 err(1, "setsockopt(RCVTIMEO) failed"); 563 564 n = read(s, &info_hdr, sizeof(info_hdr)); 565 if (n != sizeof(info_hdr)) { 566 if (n < 0) { 567 if (errno == EAGAIN) { 568 close(s); 569 goto again; 570 } 571 err(1, "read info hdr failed"); 572 } else { 573 errx(1, "read truncated info hdr"); 574 } 575 } 576 if (info_hdr.ndport == 0) { 577 close(s); 578 continue; 579 } 580 581 ports_sz = info_hdr.ndport * sizeof(uint16_t); 582 ports = malloc(ports_sz); 583 if (ports == NULL) 584 err(1, "malloc failed"); 585 586 n = read(s, ports, ports_sz); 587 if (n != ports_sz) { 588 if (n < 0) { 589 if (errno == EAGAIN) { 590 free(ports); 591 close(s); 592 goto again; 593 } 594 err(1, "read ports failed"); 595 } else { 596 errx(1, "read truncated ports"); 597 } 598 } 599 600 daddr = reallocf(daddr, 601 (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in)); 602 if (daddr == NULL) 603 err(1, "reallocf failed"); 604 605 for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) { 606 struct sockaddr_in *da = &daddr[d]; 607 608 *da = *in; 609 da->sin_port = ports[d - ndaddr]; 610 } 611 ndaddr += info_hdr.ndport; 612 613 free(ports); 614 close(s); 615 } 616 617 #ifdef SEND_DEBUG 618 for (i = 0; i < ndaddr; ++i) { 619 const struct sockaddr_in *da = &daddr[i]; 620 char name[INET_ADDRSTRLEN]; 621 622 fprintf(stderr, "%s:%d\n", 623 inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)), 624 ntohs(da->sin_port)); 625 } 626 #endif 627 628 *daddr0 = daddr; 629 *ndaddr0 = ndaddr; 630 } 631 632 static void * 633 send_thread(void *xctx) 634 { 635 struct send_thrctx *ctx = xctx; 636 struct conn_ctx *timeo; 637 struct kevent chg_evt; 638 uint8_t *buf; 639 int nconn = 0, kq, n, fd = -1, buflen; 640 char name[32]; 641 642 snprintf(name, sizeof(name), "snd%d", ctx->t_id); 643 pthread_set_name_np(pthread_self(), name); 644 645 buflen = ctx->t_glob->g_buflen; 646 buf = malloc(buflen); 647 if (buf == NULL) 648 err(1, "malloc(%d) failed", buflen); 649 650 if (ctx->t_glob->g_sendfile) { 651 char filename[] = "sendtmpXXX"; 652 653 fd = mkstemp(filename); 654 if (fd < 0) 655 err(1, "mkstemp failed"); 656 if (write(fd, buf, buflen) != buflen) 657 err(1, "write to file failed"); 658 unlink(filename); 659 free(buf); 660 buf = NULL; 661 } 662 663 kq = kqueue(); 664 if (kq < 0) 665 err(1, "kqueue failed"); 666 667 /* 668 * Establish the connections assigned to us and add the 669 * established connections to kqueue. 670 */ 671 for (;;) { 672 #ifdef SEND_DEBUG 673 char addr_name[INET_ADDRSTRLEN]; 674 #endif 675 struct timeval readto; 676 struct conn_ctx *conn; 677 struct conn_ack ack; 678 int on; 679 680 pthread_mutex_lock(&ctx->t_lock); 681 while (STAILQ_EMPTY(&ctx->t_conn)) 682 pthread_cond_wait(&ctx->t_cond, &ctx->t_lock); 683 conn = STAILQ_FIRST(&ctx->t_conn); 684 STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link); 685 pthread_mutex_unlock(&ctx->t_lock); 686 687 if (conn->c_in.sin_port == 0) { 688 /* 689 * The marker for 'the end of connection list'. 690 * See the related comment in main thread. 691 * 692 * NOTE: 693 * We reuse the marker as the udata for the 694 * kqueue timer. 695 */ 696 timeo = conn; 697 break; 698 } 699 700 ++nconn; 701 #ifdef SEND_DEBUG 702 fprintf(stderr, "%s %s:%d\n", name, 703 inet_ntop(AF_INET, &conn->c_in.sin_addr, 704 addr_name, sizeof(addr_name)), 705 ntohs(conn->c_in.sin_port)); 706 #endif 707 708 again: 709 conn->c_s = socket(AF_INET, SOCK_STREAM, 0); 710 if (conn->c_s < 0) 711 err(1, "socket failed"); 712 713 if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in, 714 sizeof(conn->c_in)) < 0) 715 err(1, "connect failed"); 716 717 memset(&readto, 0, sizeof(readto)); 718 readto.tv_sec = ctx->t_glob->g_readto_ms / 1000; 719 readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000; 720 if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto, 721 sizeof(readto)) < 0) 722 err(1, "setsockopt(RCVTIMEO) failed"); 723 724 n = read(conn->c_s, &ack, sizeof(ack)); 725 if (n != sizeof(ack)) { 726 if (n < 0) { 727 if (errno == EAGAIN) { 728 close(conn->c_s); 729 goto again; 730 } 731 err(1, "read ack failed"); 732 } else { 733 errx(1, "read truncated ack"); 734 } 735 } 736 737 on = 1; 738 if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0) 739 err(1, "ioctl(FIONBIO) failed"); 740 741 EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn); 742 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL); 743 if (n < 0) 744 err(1, "kevent add failed"); 745 } 746 #ifdef SEND_DEBUG 747 fprintf(stderr, "%s conn %d\n", name, nconn); 748 #endif 749 750 /* 751 * Sender start sync, stage 1: 752 * Wait for connections establishment (slow). 753 */ 754 pthread_mutex_lock(&ctx->t_glob->g_lock); 755 ctx->t_glob->g_nconn -= nconn; 756 pthread_cond_broadcast(&ctx->t_glob->g_cond); 757 while (ctx->t_glob->g_nconn != 0) 758 pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock); 759 pthread_mutex_unlock(&ctx->t_glob->g_lock); 760 761 /* 762 * Sender start sync, stage2. 763 */ 764 /* Increase the g_nwait. */ 765 atomic_add_int(&ctx->t_glob->g_nwait, 1); 766 /* Spin-wait for main thread to release us (reset g_nwait). */ 767 while (ctx->t_glob->g_nwait) 768 send_spinwait(); 769 770 #ifdef SEND_DEBUG 771 fprintf(stderr, "%s start\n", name); 772 #endif 773 774 /* 775 * Wire a kqueue timer, so that the sending can be terminated 776 * as requested. 777 * 778 * NOTE: 779 * Set -2 to c_s for timer udata, so we could distinguish it 780 * from real connections. 781 */ 782 timeo->c_s = -2; 783 EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0, 784 ctx->t_glob->g_dur * 1000L, timeo); 785 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL); 786 if (n < 0) 787 err(1, "kevent add failed"); 788 789 clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start); 790 for (;;) { 791 struct kevent evt[SEND_EVENT_MAX]; 792 int nevt, i; 793 794 nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL); 795 if (nevt < 0) 796 err(1, "kevent failed"); 797 798 for (i = 0; i < nevt; ++i) { 799 struct conn_ctx *conn = evt[i].udata; 800 801 if (conn->c_s < 0) { 802 if (conn->c_s == -2) { 803 /* Timer expired */ 804 goto done; 805 } 806 continue; 807 } 808 809 if (fd >= 0) { 810 off_t m, off; 811 size_t len; 812 813 off = conn->c_stat % buflen; 814 len = buflen - off; 815 816 n = sendfile(fd, conn->c_s, off, len, NULL, 817 &m, 0); 818 if (n == 0 || (n < 0 && errno == EAGAIN)) 819 n = m; 820 } else { 821 n = write(conn->c_s, buf, buflen); 822 } 823 824 if (n < 0) { 825 if (errno != EAGAIN) { 826 conn->c_err = errno; 827 clock_gettime(CLOCK_MONOTONIC_PRECISE, 828 &conn->c_terr); 829 close(conn->c_s); 830 conn->c_s = -1; 831 } 832 } else { 833 conn->c_stat += n; 834 } 835 } 836 } 837 done: 838 clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end); 839 840 if (fd >= 0) 841 close(fd); 842 if (buf != NULL) 843 free(buf); 844 return NULL; 845 } 846