1 #include <sys/param.h> 2 #include <sys/event.h> 3 #include <sys/ioctl.h> 4 #include <sys/queue.h> 5 #include <sys/socket.h> 6 #include <sys/sysctl.h> 7 #include <sys/time.h> 8 9 #include <machine/atomic.h> 10 #ifdef __FreeBSD__ 11 #include <machine/cpu.h> 12 #endif 13 #include <machine/cpufunc.h> 14 15 #include <arpa/inet.h> 16 #include <netinet/in.h> 17 18 #include <err.h> 19 #include <errno.h> 20 #include <pthread.h> 21 #include <pthread_np.h> 22 #include <signal.h> 23 #include <stdio.h> 24 #include <stdint.h> 25 #include <stdlib.h> 26 #include <string.h> 27 #include <unistd.h> 28 29 #include "kq_sendrecv_proto.h" 30 31 /* 32 * Note about the sender start synchronization. 33 * 34 * We apply two stage synchronization. The first stage uses pthread 35 * condition (it sleeps), which waits for the establishment for all 36 * connections, which could be slow. The second stage uses g_nwait 37 * of send_globctx; all relevant threads spin on g_nwait. The main 38 * thread spin-waits for all senders to increase g_nwait. The sender 39 * thread increases the g_nwait, then it spin-waits for main thread 40 * to reset g_nwait. In this way, we can make sure that all senders 41 * start roughly at the same time. 42 */ 43 44 #ifndef timespecsub 45 #define timespecsub(vvp, uvp) \ 46 do { \ 47 (vvp)->tv_sec -= (uvp)->tv_sec; \ 48 (vvp)->tv_nsec -= (uvp)->tv_nsec; \ 49 if ((vvp)->tv_nsec < 0) { \ 50 (vvp)->tv_sec--; \ 51 (vvp)->tv_nsec += 1000000000; \ 52 } \ 53 } while (0) 54 #endif 55 56 #ifndef timespeccmp 57 #define timespeccmp(tvp, uvp, cmp) \ 58 (((tvp)->tv_sec == (uvp)->tv_sec) ? \ 59 ((tvp)->tv_nsec cmp (uvp)->tv_nsec) : \ 60 ((tvp)->tv_sec cmp (uvp)->tv_sec)) 61 #endif 62 63 #if 0 64 #define SEND_DEBUG 65 #endif 66 #if 0 67 #define SEND_TIME_DEBUG 68 #endif 69 70 #define SEND_DUR 10 71 #define SEND_EVENT_MAX 64 72 #define SEND_BUFLEN (128 * 1024) 73 74 /* 75 * The successful 3-way handshake on the connection does not mean the 76 * remote application can accept(2) this connection. Even worse, the 77 * remote side's network stack may drop the connection silently, i.e. 78 * w/o RST. If this happened, the blocking read(2) would not return, 79 * until the keepalive kicked in, which would take quite some time. 80 * This is obviously not what we want here, so use synthetic timeout 81 * for blocking read(2). Here, we will retry if a blocking read(2) 82 * times out. 83 */ 84 #define SEND_READTO_MS 1000 /* unit: ms */ 85 86 #if defined(__DragonFly__) 87 #define SEND_CONN_CTX_ALIGN __VM_CACHELINE_SIZE 88 #elif defined(__FreeBSD__) 89 #define SEND_CONN_CTX_ALIGN CACHE_LINE_SIZE 90 #else 91 #define SEND_CONN_CTX_ALIGN 64 /* XXX */ 92 #endif 93 94 struct conn_ctx { 95 int c_s; 96 int c_err; 97 uint64_t c_stat; 98 struct timespec c_terr; 99 100 STAILQ_ENTRY(conn_ctx) c_glob_link; 101 STAILQ_ENTRY(conn_ctx) c_link; 102 struct sockaddr_in c_in; 103 int c_thr_id; 104 } __aligned(SEND_CONN_CTX_ALIGN); 105 106 STAILQ_HEAD(conn_ctx_list, conn_ctx); 107 108 struct send_globctx { 109 struct conn_ctx_list g_conn; 110 111 int g_dur; 112 int g_nconn; 113 pthread_mutex_t g_lock; 114 pthread_cond_t g_cond; 115 116 volatile u_int g_nwait; 117 int g_readto_ms; /* unit: ms */ 118 }; 119 120 struct send_thrctx { 121 struct conn_ctx_list t_conn; 122 pthread_mutex_t t_lock; 123 pthread_cond_t t_cond; 124 125 struct send_globctx *t_glob; 126 struct timespec t_start; 127 struct timespec t_end; 128 double t_run_us; /* unit: us */ 129 130 pthread_t t_tid; 131 int t_id; 132 }; 133 134 static void send_build_addrlist(const struct sockaddr_in *, int, 135 const struct sockaddr_in **, int *, int); 136 static void *send_thread(void *); 137 138 static __inline void 139 send_spinwait(void) 140 { 141 #if defined(__DragonFly__) 142 cpu_pause(); 143 #elif defined(__FreeBSD__) 144 cpu_spinwait(); 145 #else 146 /* XXX nothing */ 147 #endif 148 } 149 150 static void 151 usage(const char *cmd) 152 { 153 fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] " 154 "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-E]\n", cmd); 155 exit(2); 156 } 157 158 int 159 main(int argc, char *argv[]) 160 { 161 struct send_globctx glob; 162 struct send_thrctx *ctx_arr, *ctx; 163 struct sockaddr_in *in_arr, *in; 164 const struct sockaddr_in *daddr; 165 struct timespec run, end, start; 166 double total_run_us, total, conn_min, conn_max; 167 double jain, jain_res; 168 int jain_cnt; 169 struct conn_ctx *conn; 170 sigset_t sigset; 171 int opt, i; 172 int in_arr_cnt, in_arr_sz, ndaddr; 173 int nthr, nconn, dur, readto_ms; 174 int log_err, err_cnt, has_minmax; 175 u_short port = RECV_PORT; 176 uint32_t idx; 177 size_t sz; 178 179 sigemptyset(&sigset); 180 sigaddset(&sigset, SIGPIPE); 181 if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0) 182 err(1, "sigprocmask failed"); 183 184 sz = sizeof(nthr); 185 if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0) 186 err(1, "sysctl hw.ncpu failed"); 187 188 in_arr_sz = 4; 189 in_arr_cnt = 0; 190 in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in)); 191 if (in_arr == NULL) 192 err(1, "malloc failed"); 193 194 log_err = 0; 195 nconn = 0; 196 dur = SEND_DUR; 197 readto_ms = SEND_READTO_MS; 198 199 while ((opt = getopt(argc, argv, "4:Ec:l:p:r:t:")) != -1) { 200 switch (opt) { 201 case '4': 202 if (in_arr_cnt == in_arr_sz) { 203 in_arr_sz *= 2; 204 in_arr = reallocf(in_arr, 205 in_arr_sz * sizeof(struct sockaddr_in)); 206 if (in_arr == NULL) 207 err(1, "reallocf failed"); 208 } 209 in = &in_arr[in_arr_cnt]; 210 ++in_arr_cnt; 211 212 memset(in, 0, sizeof(*in)); 213 in->sin_family = AF_INET; 214 if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0) 215 errx(1, "inet_pton failed %s", optarg); 216 break; 217 218 case 'E': 219 log_err = 1; 220 break; 221 222 case 'c': 223 nconn = strtol(optarg, NULL, 10); 224 if (nconn <= 0) 225 errx(1, "invalid -c"); 226 break; 227 228 case 'l': 229 dur = strtoul(optarg, NULL, 10); 230 if (dur == 0) 231 errx(1, "invalid -l"); 232 break; 233 234 case 'p': 235 port = strtoul(optarg, NULL, 10); 236 break; 237 238 case 'r': 239 readto_ms = strtol(optarg, NULL, 10); 240 if (readto_ms <= 0) 241 errx(1, "invalid -r"); 242 break; 243 244 case 't': 245 nthr = strtol(optarg, NULL, 10); 246 if (nthr <= 0) 247 errx(1, "invalid -t"); 248 break; 249 250 default: 251 usage(argv[0]); 252 } 253 } 254 if (in_arr_cnt == 0 || nconn == 0) 255 errx(1, "neither -4 nor -c are specified"); 256 257 if (nthr > nconn) 258 nthr = nconn; 259 260 for (i = 0; i < in_arr_cnt; ++i) 261 in_arr[i].sin_port = htons(port); 262 263 ctx_arr = calloc(nthr, sizeof(struct send_thrctx)); 264 if (ctx_arr == NULL) 265 err(1, "calloc failed"); 266 267 memset(&glob, 0, sizeof(glob)); 268 STAILQ_INIT(&glob.g_conn); 269 glob.g_nconn = nconn; 270 glob.g_nwait = 1; /* count self in */ 271 glob.g_dur = dur; 272 glob.g_readto_ms = readto_ms; 273 pthread_mutex_init(&glob.g_lock, NULL); 274 pthread_cond_init(&glob.g_cond, NULL); 275 276 pthread_set_name_np(pthread_self(), "main"); 277 278 /* Build receiver address list */ 279 send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms); 280 281 /* 282 * Start senders. 283 */ 284 for (i = 0; i < nthr; ++i) { 285 int error; 286 287 ctx = &ctx_arr[i]; 288 STAILQ_INIT(&ctx->t_conn); 289 ctx->t_id = i; 290 ctx->t_glob = &glob; 291 pthread_mutex_init(&ctx->t_lock, NULL); 292 pthread_cond_init(&ctx->t_cond, NULL); 293 294 error = pthread_create(&ctx->t_tid, NULL, send_thread, ctx); 295 if (error) 296 errc(1, error, "pthread_create failed"); 297 } 298 299 /* 300 * Distribute connections to senders. 301 * 302 * NOTE: 303 * We start from a random position in the address list, so that the 304 * first several receiving servers will not be abused, if the number 305 * of connections is small and there are many clients. 306 */ 307 idx = arc4random_uniform(ndaddr); 308 for (i = 0; i < nconn; ++i) { 309 const struct sockaddr_in *da; 310 311 da = &daddr[idx % ndaddr]; 312 ++idx; 313 314 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn)); 315 if (conn == NULL) 316 err(1, "aligned_alloc failed"); 317 memset(conn, 0, sizeof(*conn)); 318 conn->c_in = *da; 319 conn->c_s = -1; 320 321 ctx = &ctx_arr[i % nthr]; 322 conn->c_thr_id = ctx->t_id; 323 324 pthread_mutex_lock(&ctx->t_lock); 325 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link); 326 pthread_mutex_unlock(&ctx->t_lock); 327 pthread_cond_signal(&ctx->t_cond); 328 329 /* Add to the global list for results gathering */ 330 STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link); 331 } 332 333 /* 334 * No more connections; notify the senders. 335 * 336 * NOTE: 337 * The marker for 'the end of connection list' has 0 in its 338 * c_in.sin_port. 339 */ 340 for (i = 0; i < nthr; ++i) { 341 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn)); 342 if (conn == NULL) 343 err(1, "aligned_alloc failed"); 344 memset(conn, 0, sizeof(*conn)); 345 conn->c_s = -1; 346 347 ctx = &ctx_arr[i]; 348 pthread_mutex_lock(&ctx->t_lock); 349 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link); 350 pthread_mutex_unlock(&ctx->t_lock); 351 pthread_cond_signal(&ctx->t_cond); 352 } 353 354 /* 355 * Sender start sync, stage 1: 356 * Wait for connections establishment (slow). 357 */ 358 pthread_mutex_lock(&glob.g_lock); 359 while (glob.g_nconn != 0) 360 pthread_cond_wait(&glob.g_cond, &glob.g_lock); 361 pthread_mutex_unlock(&glob.g_lock); 362 363 /* 364 * Sender start sync, stage 2: 365 * Wait for senders to spin-wait; and once all senders spin-wait, 366 * release them by resetting g_nwait. 367 */ 368 while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0) 369 send_spinwait(); 370 371 fprintf(stderr, "start %d seconds sending test: %d threads, " 372 "%d connections\n", dur, nthr, nconn); 373 374 /* 375 * Wait for the senders to finish and gather the results. 376 */ 377 378 memset(&end, 0, sizeof(end)); /* XXX stupid gcc warning */ 379 memset(&start, 0, sizeof(start)); /* XXX stupid gcc warning */ 380 381 for (i = 0; i < nthr; ++i) { 382 ctx = &ctx_arr[i]; 383 pthread_join(ctx->t_tid, NULL); 384 385 run = ctx->t_end; 386 timespecsub(&run, &ctx->t_start); 387 ctx->t_run_us = ((double)run.tv_sec * 1000000.0) + 388 ((double)run.tv_nsec / 1000.0); 389 390 if (i == 0) { 391 start = ctx->t_start; 392 end = ctx->t_end; 393 } else { 394 if (timespeccmp(&start, &ctx->t_start, >)) 395 start = ctx->t_start; 396 if (timespeccmp(&end, &ctx->t_end, <)) 397 end = ctx->t_end; 398 } 399 400 #ifdef SEND_TIME_DEBUG 401 fprintf(stderr, "start %ld.%ld, end %ld.%ld\n", 402 ctx->t_start.tv_sec, ctx->t_start.tv_nsec, 403 ctx->t_end.tv_sec, ctx->t_end.tv_nsec); 404 #endif 405 } 406 407 #ifdef SEND_TIME_DEBUG 408 fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n", 409 start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec); 410 #endif 411 412 run = end; 413 timespecsub(&run, &start); 414 total_run_us = ((double)run.tv_sec * 1000000.0) + 415 ((double)run.tv_nsec / 1000.0); 416 total = 0.0; 417 418 err_cnt = 0; 419 has_minmax = 0; 420 conn_min = 0.0; 421 conn_max = 0.0; 422 423 jain = 0.0; 424 jain_res = 0.0; 425 jain_cnt = 0; 426 427 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) { 428 total += conn->c_stat; 429 if (conn->c_err == 0) { 430 double perf; /* unit: Mbps */ 431 432 perf = (conn->c_stat * 8.0) / 433 ctx_arr[conn->c_thr_id].t_run_us; 434 if (!has_minmax) { 435 conn_min = perf; 436 conn_max = perf; 437 has_minmax = 1; 438 } else { 439 if (perf > conn_max) 440 conn_max = perf; 441 if (perf < conn_min) 442 conn_min = perf; 443 } 444 jain += (perf * perf); 445 jain_res += perf; 446 ++jain_cnt; 447 } else { 448 ++err_cnt; 449 } 450 } 451 452 jain *= jain_cnt; 453 jain = (jain_res * jain_res) / jain; 454 455 printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, " 456 "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max, 457 jain, err_cnt); 458 459 if (log_err && err_cnt) { 460 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) { 461 char name[INET_ADDRSTRLEN]; 462 double tmp_run; 463 464 if (conn->c_err == 0) 465 continue; 466 467 run = conn->c_terr; 468 timespecsub(&run, &ctx_arr[conn->c_thr_id].t_start); 469 tmp_run = ((double)run.tv_sec * 1000000.0) + 470 ((double)run.tv_nsec / 1000.0); 471 fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, " 472 "errno %d\n", 473 conn->c_thr_id, 474 inet_ntop(AF_INET, &conn->c_in.sin_addr, 475 name, sizeof(name)), 476 ntohs(conn->c_in.sin_port), 477 run.tv_sec, (conn->c_stat * 8.0) / tmp_run, 478 conn->c_err); 479 --err_cnt; 480 if (err_cnt == 0) 481 break; 482 } 483 } 484 485 exit(0); 486 } 487 488 static void 489 send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt, 490 const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms) 491 { 492 struct sockaddr_in *daddr; 493 struct timeval readto; 494 int i, ndaddr; 495 496 daddr = NULL; 497 ndaddr = 0; 498 499 memset(&readto, 0, sizeof(readto)); 500 readto.tv_sec = readto_ms / 1000; 501 readto.tv_usec = (readto_ms % 1000) * 1000; 502 503 for (i = 0; i < in_arr_cnt; ++i) { 504 const struct sockaddr_in *in = &in_arr[i]; 505 struct recv_info info_hdr; 506 uint16_t *ports; 507 int s, n, ports_sz, d; 508 509 again: 510 s = socket(AF_INET, SOCK_STREAM, 0); 511 if (s < 0) 512 err(1, "socket failed"); 513 514 if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0) 515 err(1, "connect failed"); 516 517 if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, 518 &readto, sizeof(readto)) < 0) 519 err(1, "setsockopt(RCVTIMEO) failed"); 520 521 n = read(s, &info_hdr, sizeof(info_hdr)); 522 if (n != sizeof(info_hdr)) { 523 if (n < 0) { 524 if (errno == EAGAIN) { 525 close(s); 526 goto again; 527 } 528 err(1, "read info hdr failed"); 529 } else { 530 errx(1, "read truncated info hdr"); 531 } 532 } 533 if (info_hdr.ndport == 0) { 534 close(s); 535 continue; 536 } 537 538 ports_sz = info_hdr.ndport * sizeof(uint16_t); 539 ports = malloc(ports_sz); 540 if (ports == NULL) 541 err(1, "malloc failed"); 542 543 n = read(s, ports, ports_sz); 544 if (n != ports_sz) { 545 if (n < 0) { 546 if (errno == EAGAIN) { 547 free(ports); 548 close(s); 549 goto again; 550 } 551 err(1, "read ports failed"); 552 } else { 553 errx(1, "read truncated ports"); 554 } 555 } 556 557 daddr = reallocf(daddr, 558 (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in)); 559 if (daddr == NULL) 560 err(1, "reallocf failed"); 561 562 for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) { 563 struct sockaddr_in *da = &daddr[d]; 564 565 *da = *in; 566 da->sin_port = ports[d - ndaddr]; 567 } 568 ndaddr += info_hdr.ndport; 569 570 free(ports); 571 close(s); 572 } 573 574 #ifdef SEND_DEBUG 575 for (i = 0; i < ndaddr; ++i) { 576 const struct sockaddr_in *da = &daddr[i]; 577 char name[INET_ADDRSTRLEN]; 578 579 fprintf(stderr, "%s:%d\n", 580 inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)), 581 ntohs(da->sin_port)); 582 } 583 #endif 584 585 *daddr0 = daddr; 586 *ndaddr0 = ndaddr; 587 } 588 589 static void * 590 send_thread(void *xctx) 591 { 592 struct send_thrctx *ctx = xctx; 593 struct conn_ctx *timeo; 594 struct kevent chg_evt; 595 uint8_t *buf; 596 int nconn = 0, kq, n; 597 char name[32]; 598 599 snprintf(name, sizeof(name), "snd%d", ctx->t_id); 600 pthread_set_name_np(pthread_self(), name); 601 602 buf = malloc(SEND_BUFLEN); 603 if (buf == NULL) 604 err(1, "malloc failed"); 605 606 kq = kqueue(); 607 if (kq < 0) 608 err(1, "kqueue failed"); 609 610 /* 611 * Establish the connections assigned to us and add the 612 * established connections to kqueue. 613 */ 614 for (;;) { 615 #ifdef SEND_DEBUG 616 char addr_name[INET_ADDRSTRLEN]; 617 #endif 618 struct timeval readto; 619 struct conn_ctx *conn; 620 struct conn_ack ack; 621 int on; 622 623 pthread_mutex_lock(&ctx->t_lock); 624 while (STAILQ_EMPTY(&ctx->t_conn)) 625 pthread_cond_wait(&ctx->t_cond, &ctx->t_lock); 626 conn = STAILQ_FIRST(&ctx->t_conn); 627 STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link); 628 pthread_mutex_unlock(&ctx->t_lock); 629 630 if (conn->c_in.sin_port == 0) { 631 /* 632 * The marker for 'the end of connection list'. 633 * See the related comment in main thread. 634 * 635 * NOTE: 636 * We reuse the marker as the udata for the 637 * kqueue timer. 638 */ 639 timeo = conn; 640 break; 641 } 642 643 ++nconn; 644 #ifdef SEND_DEBUG 645 fprintf(stderr, "%s %s:%d\n", name, 646 inet_ntop(AF_INET, &conn->c_in.sin_addr, 647 addr_name, sizeof(addr_name)), 648 ntohs(conn->c_in.sin_port)); 649 #endif 650 651 again: 652 conn->c_s = socket(AF_INET, SOCK_STREAM, 0); 653 if (conn->c_s < 0) 654 err(1, "socket failed"); 655 656 if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in, 657 sizeof(conn->c_in)) < 0) 658 err(1, "connect failed"); 659 660 memset(&readto, 0, sizeof(readto)); 661 readto.tv_sec = ctx->t_glob->g_readto_ms / 1000; 662 readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000; 663 if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto, 664 sizeof(readto)) < 0) 665 err(1, "setsockopt(RCVTIMEO) failed"); 666 667 n = read(conn->c_s, &ack, sizeof(ack)); 668 if (n != sizeof(ack)) { 669 if (n < 0) { 670 if (errno == EAGAIN) { 671 close(conn->c_s); 672 goto again; 673 } 674 err(1, "read ack failed"); 675 } else { 676 errx(1, "read truncated ack"); 677 } 678 } 679 680 on = 1; 681 if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0) 682 err(1, "ioctl(FIONBIO) failed"); 683 684 EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn); 685 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL); 686 if (n < 0) 687 err(1, "kevent add failed"); 688 } 689 #ifdef SEND_DEBUG 690 fprintf(stderr, "%s conn %d\n", name, nconn); 691 #endif 692 693 /* 694 * Sender start sync, stage 1: 695 * Wait for connections establishment (slow). 696 */ 697 pthread_mutex_lock(&ctx->t_glob->g_lock); 698 ctx->t_glob->g_nconn -= nconn; 699 pthread_cond_broadcast(&ctx->t_glob->g_cond); 700 while (ctx->t_glob->g_nconn != 0) 701 pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock); 702 pthread_mutex_unlock(&ctx->t_glob->g_lock); 703 704 /* 705 * Sender start sync, stage2. 706 */ 707 /* Increase the g_nwait. */ 708 atomic_add_int(&ctx->t_glob->g_nwait, 1); 709 /* Spin-wait for main thread to release us (reset g_nwait). */ 710 while (ctx->t_glob->g_nwait) 711 send_spinwait(); 712 713 #ifdef SEND_DEBUG 714 fprintf(stderr, "%s start\n", name); 715 #endif 716 717 /* 718 * Wire a kqueue timer, so that the sending can be terminated 719 * as requested. 720 * 721 * NOTE: 722 * Set -2 to c_s for timer udata, so we could distinguish it 723 * from real connections. 724 */ 725 timeo->c_s = -2; 726 EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0, 727 ctx->t_glob->g_dur * 1000L, timeo); 728 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL); 729 if (n < 0) 730 err(1, "kevent add failed"); 731 732 clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start); 733 for (;;) { 734 struct kevent evt[SEND_EVENT_MAX]; 735 int nevt, i; 736 737 nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL); 738 if (nevt < 0) 739 err(1, "kevent failed"); 740 741 for (i = 0; i < nevt; ++i) { 742 struct conn_ctx *conn = evt[i].udata; 743 744 if (conn->c_s < 0) { 745 if (conn->c_s == -2) { 746 /* Timer expired */ 747 goto done; 748 } 749 continue; 750 } 751 752 n = write(conn->c_s, buf, SEND_BUFLEN); 753 if (n < 0) { 754 if (errno != EAGAIN) { 755 conn->c_err = errno; 756 clock_gettime(CLOCK_MONOTONIC_PRECISE, 757 &conn->c_terr); 758 close(conn->c_s); 759 conn->c_s = -1; 760 } 761 } else { 762 conn->c_stat += n; 763 } 764 } 765 } 766 done: 767 clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end); 768 return NULL; 769 } 770