1 /* 2 * fio - the flexible io tester 3 * 4 * Copyright (C) 2005 Jens Axboe <axboe@suse.de> 5 * Copyright (C) 2006-2012 Jens Axboe <axboe@kernel.dk> 6 * 7 * The license below covers all files distributed with fio unless otherwise 8 * noted in the file itself. 9 * 10 * This program is free software; you can redistribute it and/or modify 11 * it under the terms of the GNU General Public License version 2 as 12 * published by the Free Software Foundation. 13 * 14 * This program is distributed in the hope that it will be useful, 15 * but WITHOUT ANY WARRANTY; without even the implied warranty of 16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 * GNU General Public License for more details. 18 * 19 * You should have received a copy of the GNU General Public License 20 * along with this program; if not, write to the Free Software 21 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 22 * 23 */ 24 #include <unistd.h> 25 #include <string.h> 26 #include <signal.h> 27 #include <assert.h> 28 #include <inttypes.h> 29 #include <sys/stat.h> 30 #include <sys/wait.h> 31 #include <math.h> 32 #include <pthread.h> 33 34 #include "fio.h" 35 #include "smalloc.h" 36 #include "verify.h" 37 #include "diskutil.h" 38 #include "cgroup.h" 39 #include "profile.h" 40 #include "lib/rand.h" 41 #include "lib/memalign.h" 42 #include "server.h" 43 #include "lib/getrusage.h" 44 #include "idletime.h" 45 #include "err.h" 46 #include "workqueue.h" 47 #include "lib/mountcheck.h" 48 #include "rate-submit.h" 49 #include "helper_thread.h" 50 #include "pshared.h" 51 #include "zone-dist.h" 52 53 static struct fio_sem *startup_sem; 54 static struct flist_head *cgroup_list; 55 static struct cgroup_mnt *cgroup_mnt; 56 static int exit_value; 57 static volatile bool fio_abort; 58 static unsigned int nr_process = 0; 59 static unsigned int nr_thread = 0; 60 61 struct io_log *agg_io_log[DDIR_RWDIR_CNT]; 62 63 int groupid = 0; 64 unsigned int thread_number = 0; 65 unsigned int nr_segments = 0; 66 unsigned int cur_segment = 0; 67 unsigned int stat_number = 0; 68 int temp_stall_ts; 69 unsigned long done_secs = 0; 70 #ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP 71 pthread_mutex_t overlap_check = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP; 72 #else 73 pthread_mutex_t overlap_check = PTHREAD_MUTEX_INITIALIZER; 74 #endif 75 76 #define JOB_START_TIMEOUT (5 * 1000) 77 78 static void sig_int(int sig) 79 { 80 if (nr_segments) { 81 if (is_backend) 82 fio_server_got_signal(sig); 83 else { 84 log_info("\nfio: terminating on signal %d\n", sig); 85 log_info_flush(); 86 exit_value = 128; 87 } 88 89 fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); 90 } 91 } 92 93 void sig_show_status(int sig) 94 { 95 show_running_run_stats(); 96 } 97 98 static void set_sig_handlers(void) 99 { 100 struct sigaction act; 101 102 memset(&act, 0, sizeof(act)); 103 act.sa_handler = sig_int; 104 act.sa_flags = SA_RESTART; 105 sigaction(SIGINT, &act, NULL); 106 107 memset(&act, 0, sizeof(act)); 108 act.sa_handler = sig_int; 109 act.sa_flags = SA_RESTART; 110 sigaction(SIGTERM, &act, NULL); 111 112 /* Windows uses SIGBREAK as a quit signal from other applications */ 113 #ifdef WIN32 114 memset(&act, 0, sizeof(act)); 115 act.sa_handler = sig_int; 116 act.sa_flags = SA_RESTART; 117 sigaction(SIGBREAK, &act, NULL); 118 #endif 119 120 memset(&act, 0, sizeof(act)); 121 act.sa_handler = sig_show_status; 122 act.sa_flags = SA_RESTART; 123 sigaction(SIGUSR1, &act, NULL); 124 125 if (is_backend) { 126 memset(&act, 0, sizeof(act)); 127 act.sa_handler = sig_int; 128 act.sa_flags = SA_RESTART; 129 sigaction(SIGPIPE, &act, NULL); 130 } 131 } 132 133 /* 134 * Check if we are above the minimum rate given. 135 */ 136 static bool __check_min_rate(struct thread_data *td, struct timespec *now, 137 enum fio_ddir ddir) 138 { 139 unsigned long long bytes = 0; 140 unsigned long iops = 0; 141 unsigned long spent; 142 unsigned long long rate; 143 unsigned long long ratemin = 0; 144 unsigned int rate_iops = 0; 145 unsigned int rate_iops_min = 0; 146 147 assert(ddir_rw(ddir)); 148 149 if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir]) 150 return false; 151 152 /* 153 * allow a 2 second settle period in the beginning 154 */ 155 if (mtime_since(&td->start, now) < 2000) 156 return false; 157 158 iops += td->this_io_blocks[ddir]; 159 bytes += td->this_io_bytes[ddir]; 160 ratemin += td->o.ratemin[ddir]; 161 rate_iops += td->o.rate_iops[ddir]; 162 rate_iops_min += td->o.rate_iops_min[ddir]; 163 164 /* 165 * if rate blocks is set, sample is running 166 */ 167 if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) { 168 spent = mtime_since(&td->lastrate[ddir], now); 169 if (spent < td->o.ratecycle) 170 return false; 171 172 if (td->o.rate[ddir] || td->o.ratemin[ddir]) { 173 /* 174 * check bandwidth specified rate 175 */ 176 if (bytes < td->rate_bytes[ddir]) { 177 log_err("%s: rate_min=%lluB/s not met, only transferred %lluB\n", 178 td->o.name, ratemin, bytes); 179 return true; 180 } else { 181 if (spent) 182 rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent; 183 else 184 rate = 0; 185 186 if (rate < ratemin || 187 bytes < td->rate_bytes[ddir]) { 188 log_err("%s: rate_min=%lluB/s not met, got %lluB/s\n", 189 td->o.name, ratemin, rate); 190 return true; 191 } 192 } 193 } else { 194 /* 195 * checks iops specified rate 196 */ 197 if (iops < rate_iops) { 198 log_err("%s: rate_iops_min=%u not met, only performed %lu IOs\n", 199 td->o.name, rate_iops, iops); 200 return true; 201 } else { 202 if (spent) 203 rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent; 204 else 205 rate = 0; 206 207 if (rate < rate_iops_min || 208 iops < td->rate_blocks[ddir]) { 209 log_err("%s: rate_iops_min=%u not met, got %llu IOPS\n", 210 td->o.name, rate_iops_min, rate); 211 return true; 212 } 213 } 214 } 215 } 216 217 td->rate_bytes[ddir] = bytes; 218 td->rate_blocks[ddir] = iops; 219 memcpy(&td->lastrate[ddir], now, sizeof(*now)); 220 return false; 221 } 222 223 static bool check_min_rate(struct thread_data *td, struct timespec *now) 224 { 225 bool ret = false; 226 227 for_each_rw_ddir(ddir) { 228 if (td->bytes_done[ddir]) 229 ret |= __check_min_rate(td, now, ddir); 230 } 231 232 return ret; 233 } 234 235 /* 236 * When job exits, we can cancel the in-flight IO if we are using async 237 * io. Attempt to do so. 238 */ 239 static void cleanup_pending_aio(struct thread_data *td) 240 { 241 int r; 242 243 /* 244 * get immediately available events, if any 245 */ 246 r = io_u_queued_complete(td, 0); 247 248 /* 249 * now cancel remaining active events 250 */ 251 if (td->io_ops->cancel) { 252 struct io_u *io_u; 253 int i; 254 255 io_u_qiter(&td->io_u_all, io_u, i) { 256 if (io_u->flags & IO_U_F_FLIGHT) { 257 r = td->io_ops->cancel(td, io_u); 258 if (!r) 259 put_io_u(td, io_u); 260 } 261 } 262 } 263 264 if (td->cur_depth) 265 r = io_u_queued_complete(td, td->cur_depth); 266 } 267 268 /* 269 * Helper to handle the final sync of a file. Works just like the normal 270 * io path, just does everything sync. 271 */ 272 static bool fio_io_sync(struct thread_data *td, struct fio_file *f) 273 { 274 struct io_u *io_u = __get_io_u(td); 275 enum fio_q_status ret; 276 277 if (!io_u) 278 return true; 279 280 io_u->ddir = DDIR_SYNC; 281 io_u->file = f; 282 io_u_set(td, io_u, IO_U_F_NO_FILE_PUT); 283 284 if (td_io_prep(td, io_u)) { 285 put_io_u(td, io_u); 286 return true; 287 } 288 289 requeue: 290 ret = td_io_queue(td, io_u); 291 switch (ret) { 292 case FIO_Q_QUEUED: 293 td_io_commit(td); 294 if (io_u_queued_complete(td, 1) < 0) 295 return true; 296 break; 297 case FIO_Q_COMPLETED: 298 if (io_u->error) { 299 td_verror(td, io_u->error, "td_io_queue"); 300 return true; 301 } 302 303 if (io_u_sync_complete(td, io_u) < 0) 304 return true; 305 break; 306 case FIO_Q_BUSY: 307 td_io_commit(td); 308 goto requeue; 309 } 310 311 return false; 312 } 313 314 static int fio_file_fsync(struct thread_data *td, struct fio_file *f) 315 { 316 int ret, ret2; 317 318 if (fio_file_open(f)) 319 return fio_io_sync(td, f); 320 321 if (td_io_open_file(td, f)) 322 return 1; 323 324 ret = fio_io_sync(td, f); 325 ret2 = 0; 326 if (fio_file_open(f)) 327 ret2 = td_io_close_file(td, f); 328 return (ret || ret2); 329 } 330 331 static inline void __update_ts_cache(struct thread_data *td) 332 { 333 fio_gettime(&td->ts_cache, NULL); 334 } 335 336 static inline void update_ts_cache(struct thread_data *td) 337 { 338 if ((++td->ts_cache_nr & td->ts_cache_mask) == td->ts_cache_mask) 339 __update_ts_cache(td); 340 } 341 342 static inline bool runtime_exceeded(struct thread_data *td, struct timespec *t) 343 { 344 if (in_ramp_time(td)) 345 return false; 346 if (!td->o.timeout) 347 return false; 348 if (utime_since(&td->epoch, t) >= td->o.timeout) 349 return true; 350 351 return false; 352 } 353 354 /* 355 * We need to update the runtime consistently in ms, but keep a running 356 * tally of the current elapsed time in microseconds for sub millisecond 357 * updates. 358 */ 359 static inline void update_runtime(struct thread_data *td, 360 unsigned long long *elapsed_us, 361 const enum fio_ddir ddir) 362 { 363 if (ddir == DDIR_WRITE && td_write(td) && td->o.verify_only) 364 return; 365 366 td->ts.runtime[ddir] -= (elapsed_us[ddir] + 999) / 1000; 367 elapsed_us[ddir] += utime_since_now(&td->start); 368 td->ts.runtime[ddir] += (elapsed_us[ddir] + 999) / 1000; 369 } 370 371 static bool break_on_this_error(struct thread_data *td, enum fio_ddir ddir, 372 int *retptr) 373 { 374 int ret = *retptr; 375 376 if (ret < 0 || td->error) { 377 int err = td->error; 378 enum error_type_bit eb; 379 380 if (ret < 0) 381 err = -ret; 382 383 eb = td_error_type(ddir, err); 384 if (!(td->o.continue_on_error & (1 << eb))) 385 return true; 386 387 if (td_non_fatal_error(td, eb, err)) { 388 /* 389 * Continue with the I/Os in case of 390 * a non fatal error. 391 */ 392 update_error_count(td, err); 393 td_clear_error(td); 394 *retptr = 0; 395 return false; 396 } else if (td->o.fill_device && (err == ENOSPC || err == EDQUOT)) { 397 /* 398 * We expect to hit this error if 399 * fill_device option is set. 400 */ 401 td_clear_error(td); 402 fio_mark_td_terminate(td); 403 return true; 404 } else { 405 /* 406 * Stop the I/O in case of a fatal 407 * error. 408 */ 409 update_error_count(td, err); 410 return true; 411 } 412 } 413 414 return false; 415 } 416 417 static void check_update_rusage(struct thread_data *td) 418 { 419 if (td->update_rusage) { 420 td->update_rusage = 0; 421 update_rusage_stat(td); 422 fio_sem_up(td->rusage_sem); 423 } 424 } 425 426 static int wait_for_completions(struct thread_data *td, struct timespec *time) 427 { 428 const int full = queue_full(td); 429 int min_evts = 0; 430 int ret; 431 432 if (td->flags & TD_F_REGROW_LOGS) 433 return io_u_quiesce(td); 434 435 /* 436 * if the queue is full, we MUST reap at least 1 event 437 */ 438 min_evts = min(td->o.iodepth_batch_complete_min, td->cur_depth); 439 if ((full && !min_evts) || !td->o.iodepth_batch_complete_min) 440 min_evts = 1; 441 442 if (time && should_check_rate(td)) 443 fio_gettime(time, NULL); 444 445 do { 446 ret = io_u_queued_complete(td, min_evts); 447 if (ret < 0) 448 break; 449 } while (full && (td->cur_depth > td->o.iodepth_low)); 450 451 return ret; 452 } 453 454 int io_queue_event(struct thread_data *td, struct io_u *io_u, int *ret, 455 enum fio_ddir ddir, uint64_t *bytes_issued, int from_verify, 456 struct timespec *comp_time) 457 { 458 switch (*ret) { 459 case FIO_Q_COMPLETED: 460 if (io_u->error) { 461 *ret = -io_u->error; 462 clear_io_u(td, io_u); 463 } else if (io_u->resid) { 464 long long bytes = io_u->xfer_buflen - io_u->resid; 465 struct fio_file *f = io_u->file; 466 467 if (bytes_issued) 468 *bytes_issued += bytes; 469 470 if (!from_verify) 471 trim_io_piece(io_u); 472 473 /* 474 * zero read, fail 475 */ 476 if (!bytes) { 477 if (!from_verify) 478 unlog_io_piece(td, io_u); 479 td_verror(td, EIO, "full resid"); 480 put_io_u(td, io_u); 481 break; 482 } 483 484 io_u->xfer_buflen = io_u->resid; 485 io_u->xfer_buf += bytes; 486 io_u->offset += bytes; 487 488 if (ddir_rw(io_u->ddir)) 489 td->ts.short_io_u[io_u->ddir]++; 490 491 if (io_u->offset == f->real_file_size) 492 goto sync_done; 493 494 requeue_io_u(td, &io_u); 495 } else { 496 sync_done: 497 if (comp_time && should_check_rate(td)) 498 fio_gettime(comp_time, NULL); 499 500 *ret = io_u_sync_complete(td, io_u); 501 if (*ret < 0) 502 break; 503 } 504 505 if (td->flags & TD_F_REGROW_LOGS) 506 regrow_logs(td); 507 508 /* 509 * when doing I/O (not when verifying), 510 * check for any errors that are to be ignored 511 */ 512 if (!from_verify) 513 break; 514 515 return 0; 516 case FIO_Q_QUEUED: 517 /* 518 * if the engine doesn't have a commit hook, 519 * the io_u is really queued. if it does have such 520 * a hook, it has to call io_u_queued() itself. 521 */ 522 if (td->io_ops->commit == NULL) 523 io_u_queued(td, io_u); 524 if (bytes_issued) 525 *bytes_issued += io_u->xfer_buflen; 526 break; 527 case FIO_Q_BUSY: 528 if (!from_verify) 529 unlog_io_piece(td, io_u); 530 requeue_io_u(td, &io_u); 531 td_io_commit(td); 532 break; 533 default: 534 assert(*ret < 0); 535 td_verror(td, -(*ret), "td_io_queue"); 536 break; 537 } 538 539 if (break_on_this_error(td, ddir, ret)) 540 return 1; 541 542 return 0; 543 } 544 545 static inline bool io_in_polling(struct thread_data *td) 546 { 547 return !td->o.iodepth_batch_complete_min && 548 !td->o.iodepth_batch_complete_max; 549 } 550 /* 551 * Unlinks files from thread data fio_file structure 552 */ 553 static int unlink_all_files(struct thread_data *td) 554 { 555 struct fio_file *f; 556 unsigned int i; 557 int ret = 0; 558 559 for_each_file(td, f, i) { 560 if (f->filetype != FIO_TYPE_FILE) 561 continue; 562 ret = td_io_unlink_file(td, f); 563 if (ret) 564 break; 565 } 566 567 if (ret) 568 td_verror(td, ret, "unlink_all_files"); 569 570 return ret; 571 } 572 573 /* 574 * Check if io_u will overlap an in-flight IO in the queue 575 */ 576 bool in_flight_overlap(struct io_u_queue *q, struct io_u *io_u) 577 { 578 bool overlap; 579 struct io_u *check_io_u; 580 unsigned long long x1, x2, y1, y2; 581 int i; 582 583 x1 = io_u->offset; 584 x2 = io_u->offset + io_u->buflen; 585 overlap = false; 586 io_u_qiter(q, check_io_u, i) { 587 if (check_io_u->flags & IO_U_F_FLIGHT) { 588 y1 = check_io_u->offset; 589 y2 = check_io_u->offset + check_io_u->buflen; 590 591 if (x1 < y2 && y1 < x2) { 592 overlap = true; 593 dprint(FD_IO, "in-flight overlap: %llu/%llu, %llu/%llu\n", 594 x1, io_u->buflen, 595 y1, check_io_u->buflen); 596 break; 597 } 598 } 599 } 600 601 return overlap; 602 } 603 604 static enum fio_q_status io_u_submit(struct thread_data *td, struct io_u *io_u) 605 { 606 /* 607 * Check for overlap if the user asked us to, and we have 608 * at least one IO in flight besides this one. 609 */ 610 if (td->o.serialize_overlap && td->cur_depth > 1 && 611 in_flight_overlap(&td->io_u_all, io_u)) 612 return FIO_Q_BUSY; 613 614 return td_io_queue(td, io_u); 615 } 616 617 /* 618 * The main verify engine. Runs over the writes we previously submitted, 619 * reads the blocks back in, and checks the crc/md5 of the data. 620 */ 621 static void do_verify(struct thread_data *td, uint64_t verify_bytes) 622 { 623 struct fio_file *f; 624 struct io_u *io_u; 625 int ret, min_events; 626 unsigned int i; 627 628 dprint(FD_VERIFY, "starting loop\n"); 629 630 /* 631 * sync io first and invalidate cache, to make sure we really 632 * read from disk. 633 */ 634 for_each_file(td, f, i) { 635 if (!fio_file_open(f)) 636 continue; 637 if (fio_io_sync(td, f)) 638 break; 639 if (file_invalidate_cache(td, f)) 640 break; 641 } 642 643 check_update_rusage(td); 644 645 if (td->error) 646 return; 647 648 /* 649 * verify_state needs to be reset before verification 650 * proceeds so that expected random seeds match actual 651 * random seeds in headers. The main loop will reset 652 * all random number generators if randrepeat is set. 653 */ 654 if (!td->o.rand_repeatable) 655 td_fill_verify_state_seed(td); 656 657 td_set_runstate(td, TD_VERIFYING); 658 659 io_u = NULL; 660 while (!td->terminate) { 661 enum fio_ddir ddir; 662 int full; 663 664 update_ts_cache(td); 665 check_update_rusage(td); 666 667 if (runtime_exceeded(td, &td->ts_cache)) { 668 __update_ts_cache(td); 669 if (runtime_exceeded(td, &td->ts_cache)) { 670 fio_mark_td_terminate(td); 671 break; 672 } 673 } 674 675 if (flow_threshold_exceeded(td)) 676 continue; 677 678 if (!td->o.experimental_verify) { 679 io_u = __get_io_u(td); 680 if (!io_u) 681 break; 682 683 if (get_next_verify(td, io_u)) { 684 put_io_u(td, io_u); 685 break; 686 } 687 688 if (td_io_prep(td, io_u)) { 689 put_io_u(td, io_u); 690 break; 691 } 692 } else { 693 if (ddir_rw_sum(td->bytes_done) + td->o.rw_min_bs > verify_bytes) 694 break; 695 696 while ((io_u = get_io_u(td)) != NULL) { 697 if (IS_ERR_OR_NULL(io_u)) { 698 io_u = NULL; 699 ret = FIO_Q_BUSY; 700 goto reap; 701 } 702 703 /* 704 * We are only interested in the places where 705 * we wrote or trimmed IOs. Turn those into 706 * reads for verification purposes. 707 */ 708 if (io_u->ddir == DDIR_READ) { 709 /* 710 * Pretend we issued it for rwmix 711 * accounting 712 */ 713 td->io_issues[DDIR_READ]++; 714 put_io_u(td, io_u); 715 continue; 716 } else if (io_u->ddir == DDIR_TRIM) { 717 io_u->ddir = DDIR_READ; 718 io_u_set(td, io_u, IO_U_F_TRIMMED); 719 break; 720 } else if (io_u->ddir == DDIR_WRITE) { 721 io_u->ddir = DDIR_READ; 722 populate_verify_io_u(td, io_u); 723 break; 724 } else { 725 put_io_u(td, io_u); 726 continue; 727 } 728 } 729 730 if (!io_u) 731 break; 732 } 733 734 if (verify_state_should_stop(td, io_u)) { 735 put_io_u(td, io_u); 736 break; 737 } 738 739 if (td->o.verify_async) 740 io_u->end_io = verify_io_u_async; 741 else 742 io_u->end_io = verify_io_u; 743 744 ddir = io_u->ddir; 745 if (!td->o.disable_slat) 746 fio_gettime(&io_u->start_time, NULL); 747 748 ret = io_u_submit(td, io_u); 749 750 if (io_queue_event(td, io_u, &ret, ddir, NULL, 1, NULL)) 751 break; 752 753 /* 754 * if we can queue more, do so. but check if there are 755 * completed io_u's first. Note that we can get BUSY even 756 * without IO queued, if the system is resource starved. 757 */ 758 reap: 759 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); 760 if (full || io_in_polling(td)) 761 ret = wait_for_completions(td, NULL); 762 763 if (ret < 0) 764 break; 765 } 766 767 check_update_rusage(td); 768 769 if (!td->error) { 770 min_events = td->cur_depth; 771 772 if (min_events) 773 ret = io_u_queued_complete(td, min_events); 774 } else 775 cleanup_pending_aio(td); 776 777 td_set_runstate(td, TD_RUNNING); 778 779 dprint(FD_VERIFY, "exiting loop\n"); 780 } 781 782 static bool exceeds_number_ios(struct thread_data *td) 783 { 784 unsigned long long number_ios; 785 786 if (!td->o.number_ios) 787 return false; 788 789 number_ios = ddir_rw_sum(td->io_blocks); 790 number_ios += td->io_u_queued + td->io_u_in_flight; 791 792 return number_ios >= (td->o.number_ios * td->loops); 793 } 794 795 static bool io_bytes_exceeded(struct thread_data *td, uint64_t *this_bytes) 796 { 797 unsigned long long bytes, limit; 798 799 if (td_rw(td)) 800 bytes = this_bytes[DDIR_READ] + this_bytes[DDIR_WRITE]; 801 else if (td_write(td)) 802 bytes = this_bytes[DDIR_WRITE]; 803 else if (td_read(td)) 804 bytes = this_bytes[DDIR_READ]; 805 else 806 bytes = this_bytes[DDIR_TRIM]; 807 808 if (td->o.io_size) 809 limit = td->o.io_size; 810 else 811 limit = td->o.size; 812 813 limit *= td->loops; 814 return bytes >= limit || exceeds_number_ios(td); 815 } 816 817 static bool io_issue_bytes_exceeded(struct thread_data *td) 818 { 819 return io_bytes_exceeded(td, td->io_issue_bytes); 820 } 821 822 static bool io_complete_bytes_exceeded(struct thread_data *td) 823 { 824 return io_bytes_exceeded(td, td->this_io_bytes); 825 } 826 827 /* 828 * used to calculate the next io time for rate control 829 * 830 */ 831 static long long usec_for_io(struct thread_data *td, enum fio_ddir ddir) 832 { 833 uint64_t bps = td->rate_bps[ddir]; 834 835 assert(!(td->flags & TD_F_CHILD)); 836 837 if (td->o.rate_process == RATE_PROCESS_POISSON) { 838 uint64_t val, iops; 839 840 iops = bps / td->o.min_bs[ddir]; 841 val = (int64_t) (1000000 / iops) * 842 -logf(__rand_0_1(&td->poisson_state[ddir])); 843 if (val) { 844 dprint(FD_RATE, "poisson rate iops=%llu, ddir=%d\n", 845 (unsigned long long) 1000000 / val, 846 ddir); 847 } 848 td->last_usec[ddir] += val; 849 return td->last_usec[ddir]; 850 } else if (bps) { 851 uint64_t bytes = td->rate_io_issue_bytes[ddir]; 852 uint64_t secs = bytes / bps; 853 uint64_t remainder = bytes % bps; 854 855 return remainder * 1000000 / bps + secs * 1000000; 856 } 857 858 return 0; 859 } 860 861 static void init_thinktime(struct thread_data *td) 862 { 863 if (td->o.thinktime_blocks_type == THINKTIME_BLOCKS_TYPE_COMPLETE) 864 td->thinktime_blocks_counter = td->io_blocks; 865 else 866 td->thinktime_blocks_counter = td->io_issues; 867 td->last_thinktime = td->epoch; 868 td->last_thinktime_blocks = 0; 869 } 870 871 static void handle_thinktime(struct thread_data *td, enum fio_ddir ddir, 872 struct timespec *time) 873 { 874 unsigned long long b; 875 uint64_t total; 876 int left; 877 struct timespec now; 878 bool stall = false; 879 880 if (td->o.thinktime_iotime) { 881 fio_gettime(&now, NULL); 882 if (utime_since(&td->last_thinktime, &now) 883 >= td->o.thinktime_iotime + td->o.thinktime) { 884 stall = true; 885 } else if (!fio_option_is_set(&td->o, thinktime_blocks)) { 886 /* 887 * When thinktime_iotime is set and thinktime_blocks is 888 * not set, skip the thinktime_blocks check, since 889 * thinktime_blocks default value 1 does not work 890 * together with thinktime_iotime. 891 */ 892 return; 893 } 894 895 } 896 897 b = ddir_rw_sum(td->thinktime_blocks_counter); 898 if (b >= td->last_thinktime_blocks + td->o.thinktime_blocks) 899 stall = true; 900 901 if (!stall) 902 return; 903 904 io_u_quiesce(td); 905 906 total = 0; 907 if (td->o.thinktime_spin) 908 total = usec_spin(td->o.thinktime_spin); 909 910 left = td->o.thinktime - total; 911 if (left) 912 total += usec_sleep(td, left); 913 914 /* 915 * If we're ignoring thinktime for the rate, add the number of bytes 916 * we would have done while sleeping, minus one block to ensure we 917 * start issuing immediately after the sleep. 918 */ 919 if (total && td->rate_bps[ddir] && td->o.rate_ign_think) { 920 uint64_t missed = (td->rate_bps[ddir] * total) / 1000000ULL; 921 uint64_t bs = td->o.min_bs[ddir]; 922 uint64_t usperop = bs * 1000000ULL / td->rate_bps[ddir]; 923 uint64_t over; 924 925 if (usperop <= total) 926 over = bs; 927 else 928 over = (usperop - total) / usperop * -bs; 929 930 td->rate_io_issue_bytes[ddir] += (missed - over); 931 /* adjust for rate_process=poisson */ 932 td->last_usec[ddir] += total; 933 } 934 935 if (time && should_check_rate(td)) 936 fio_gettime(time, NULL); 937 938 td->last_thinktime_blocks = b; 939 if (td->o.thinktime_iotime) 940 td->last_thinktime = now; 941 } 942 943 /* 944 * Main IO worker function. It retrieves io_u's to process and queues 945 * and reaps them, checking for rate and errors along the way. 946 * 947 * Returns number of bytes written and trimmed. 948 */ 949 static void do_io(struct thread_data *td, uint64_t *bytes_done) 950 { 951 unsigned int i; 952 int ret = 0; 953 uint64_t total_bytes, bytes_issued = 0; 954 955 for (i = 0; i < DDIR_RWDIR_CNT; i++) 956 bytes_done[i] = td->bytes_done[i]; 957 958 if (in_ramp_time(td)) 959 td_set_runstate(td, TD_RAMP); 960 else 961 td_set_runstate(td, TD_RUNNING); 962 963 lat_target_init(td); 964 965 total_bytes = td->o.size; 966 /* 967 * Allow random overwrite workloads to write up to io_size 968 * before starting verification phase as 'size' doesn't apply. 969 */ 970 if (td_write(td) && td_random(td) && td->o.norandommap) 971 total_bytes = max(total_bytes, (uint64_t) td->o.io_size); 972 /* 973 * If verify_backlog is enabled, we'll run the verify in this 974 * handler as well. For that case, we may need up to twice the 975 * amount of bytes. 976 */ 977 if (td->o.verify != VERIFY_NONE && 978 (td_write(td) && td->o.verify_backlog)) 979 total_bytes += td->o.size; 980 981 /* In trimwrite mode, each byte is trimmed and then written, so 982 * allow total_bytes to be twice as big */ 983 if (td_trimwrite(td)) 984 total_bytes += td->total_io_size; 985 986 while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || 987 (!flist_empty(&td->trim_list)) || !io_issue_bytes_exceeded(td) || 988 td->o.time_based) { 989 struct timespec comp_time; 990 struct io_u *io_u; 991 int full; 992 enum fio_ddir ddir; 993 994 check_update_rusage(td); 995 996 if (td->terminate || td->done) 997 break; 998 999 update_ts_cache(td); 1000 1001 if (runtime_exceeded(td, &td->ts_cache)) { 1002 __update_ts_cache(td); 1003 if (runtime_exceeded(td, &td->ts_cache)) { 1004 fio_mark_td_terminate(td); 1005 break; 1006 } 1007 } 1008 1009 if (flow_threshold_exceeded(td)) 1010 continue; 1011 1012 /* 1013 * Break if we exceeded the bytes. The exception is time 1014 * based runs, but we still need to break out of the loop 1015 * for those to run verification, if enabled. 1016 * Jobs read from iolog do not use this stop condition. 1017 */ 1018 if (bytes_issued >= total_bytes && 1019 !td->o.read_iolog_file && 1020 (!td->o.time_based || 1021 (td->o.time_based && td->o.verify != VERIFY_NONE))) 1022 break; 1023 1024 io_u = get_io_u(td); 1025 if (IS_ERR_OR_NULL(io_u)) { 1026 int err = PTR_ERR(io_u); 1027 1028 io_u = NULL; 1029 ddir = DDIR_INVAL; 1030 if (err == -EBUSY) { 1031 ret = FIO_Q_BUSY; 1032 goto reap; 1033 } 1034 if (td->o.latency_target) 1035 goto reap; 1036 break; 1037 } 1038 1039 if (io_u->ddir == DDIR_WRITE && td->flags & TD_F_DO_VERIFY) 1040 populate_verify_io_u(td, io_u); 1041 1042 ddir = io_u->ddir; 1043 1044 /* 1045 * Add verification end_io handler if: 1046 * - Asked to verify (!td_rw(td)) 1047 * - Or the io_u is from our verify list (mixed write/ver) 1048 */ 1049 if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ && 1050 ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) { 1051 1052 if (verify_state_should_stop(td, io_u)) { 1053 put_io_u(td, io_u); 1054 break; 1055 } 1056 1057 if (td->o.verify_async) 1058 io_u->end_io = verify_io_u_async; 1059 else 1060 io_u->end_io = verify_io_u; 1061 td_set_runstate(td, TD_VERIFYING); 1062 } else if (in_ramp_time(td)) 1063 td_set_runstate(td, TD_RAMP); 1064 else 1065 td_set_runstate(td, TD_RUNNING); 1066 1067 /* 1068 * Always log IO before it's issued, so we know the specific 1069 * order of it. The logged unit will track when the IO has 1070 * completed. 1071 */ 1072 if (td_write(td) && io_u->ddir == DDIR_WRITE && 1073 td->o.do_verify && 1074 td->o.verify != VERIFY_NONE && 1075 !td->o.experimental_verify) 1076 log_io_piece(td, io_u); 1077 1078 if (td->o.io_submit_mode == IO_MODE_OFFLOAD) { 1079 const unsigned long long blen = io_u->xfer_buflen; 1080 const enum fio_ddir __ddir = acct_ddir(io_u); 1081 1082 if (td->error) 1083 break; 1084 1085 workqueue_enqueue(&td->io_wq, &io_u->work); 1086 ret = FIO_Q_QUEUED; 1087 1088 if (ddir_rw(__ddir)) { 1089 td->io_issues[__ddir]++; 1090 td->io_issue_bytes[__ddir] += blen; 1091 td->rate_io_issue_bytes[__ddir] += blen; 1092 } 1093 1094 if (should_check_rate(td)) 1095 td->rate_next_io_time[__ddir] = usec_for_io(td, __ddir); 1096 1097 } else { 1098 ret = io_u_submit(td, io_u); 1099 1100 if (should_check_rate(td)) 1101 td->rate_next_io_time[ddir] = usec_for_io(td, ddir); 1102 1103 if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 0, &comp_time)) 1104 break; 1105 1106 /* 1107 * See if we need to complete some commands. Note that 1108 * we can get BUSY even without IO queued, if the 1109 * system is resource starved. 1110 */ 1111 reap: 1112 full = queue_full(td) || 1113 (ret == FIO_Q_BUSY && td->cur_depth); 1114 if (full || io_in_polling(td)) 1115 ret = wait_for_completions(td, &comp_time); 1116 } 1117 if (ret < 0) 1118 break; 1119 1120 if (ddir_rw(ddir) && td->o.thinktime) 1121 handle_thinktime(td, ddir, &comp_time); 1122 1123 if (!ddir_rw_sum(td->bytes_done) && 1124 !td_ioengine_flagged(td, FIO_NOIO)) 1125 continue; 1126 1127 if (!in_ramp_time(td) && should_check_rate(td)) { 1128 if (check_min_rate(td, &comp_time)) { 1129 if (exitall_on_terminate || td->o.exitall_error) 1130 fio_terminate_threads(td->groupid, td->o.exit_what); 1131 td_verror(td, EIO, "check_min_rate"); 1132 break; 1133 } 1134 } 1135 if (!in_ramp_time(td) && td->o.latency_target) 1136 lat_target_check(td); 1137 } 1138 1139 check_update_rusage(td); 1140 1141 if (td->trim_entries) 1142 log_err("fio: %lu trim entries leaked?\n", td->trim_entries); 1143 1144 if (td->o.fill_device && (td->error == ENOSPC || td->error == EDQUOT)) { 1145 td->error = 0; 1146 fio_mark_td_terminate(td); 1147 } 1148 if (!td->error) { 1149 struct fio_file *f; 1150 1151 if (td->o.io_submit_mode == IO_MODE_OFFLOAD) { 1152 workqueue_flush(&td->io_wq); 1153 i = 0; 1154 } else 1155 i = td->cur_depth; 1156 1157 if (i) { 1158 ret = io_u_queued_complete(td, i); 1159 if (td->o.fill_device && 1160 (td->error == ENOSPC || td->error == EDQUOT)) 1161 td->error = 0; 1162 } 1163 1164 if (should_fsync(td) && (td->o.end_fsync || td->o.fsync_on_close)) { 1165 td_set_runstate(td, TD_FSYNCING); 1166 1167 for_each_file(td, f, i) { 1168 if (!fio_file_fsync(td, f)) 1169 continue; 1170 1171 log_err("fio: end_fsync failed for file %s\n", 1172 f->file_name); 1173 } 1174 } 1175 } else 1176 cleanup_pending_aio(td); 1177 1178 /* 1179 * stop job if we failed doing any IO 1180 */ 1181 if (!ddir_rw_sum(td->this_io_bytes)) 1182 td->done = 1; 1183 1184 for (i = 0; i < DDIR_RWDIR_CNT; i++) 1185 bytes_done[i] = td->bytes_done[i] - bytes_done[i]; 1186 } 1187 1188 static void free_file_completion_logging(struct thread_data *td) 1189 { 1190 struct fio_file *f; 1191 unsigned int i; 1192 1193 for_each_file(td, f, i) { 1194 if (!f->last_write_comp) 1195 break; 1196 sfree(f->last_write_comp); 1197 } 1198 } 1199 1200 static int init_file_completion_logging(struct thread_data *td, 1201 unsigned int depth) 1202 { 1203 struct fio_file *f; 1204 unsigned int i; 1205 1206 if (td->o.verify == VERIFY_NONE || !td->o.verify_state_save) 1207 return 0; 1208 1209 for_each_file(td, f, i) { 1210 f->last_write_comp = scalloc(depth, sizeof(uint64_t)); 1211 if (!f->last_write_comp) 1212 goto cleanup; 1213 } 1214 1215 return 0; 1216 1217 cleanup: 1218 free_file_completion_logging(td); 1219 log_err("fio: failed to alloc write comp data\n"); 1220 return 1; 1221 } 1222 1223 static void cleanup_io_u(struct thread_data *td) 1224 { 1225 struct io_u *io_u; 1226 1227 while ((io_u = io_u_qpop(&td->io_u_freelist)) != NULL) { 1228 1229 if (td->io_ops->io_u_free) 1230 td->io_ops->io_u_free(td, io_u); 1231 1232 fio_memfree(io_u, sizeof(*io_u), td_offload_overlap(td)); 1233 } 1234 1235 free_io_mem(td); 1236 1237 io_u_rexit(&td->io_u_requeues); 1238 io_u_qexit(&td->io_u_freelist, false); 1239 io_u_qexit(&td->io_u_all, td_offload_overlap(td)); 1240 1241 free_file_completion_logging(td); 1242 } 1243 1244 static int init_io_u(struct thread_data *td) 1245 { 1246 struct io_u *io_u; 1247 int cl_align, i, max_units; 1248 int err; 1249 1250 max_units = td->o.iodepth; 1251 1252 err = 0; 1253 err += !io_u_rinit(&td->io_u_requeues, td->o.iodepth); 1254 err += !io_u_qinit(&td->io_u_freelist, td->o.iodepth, false); 1255 err += !io_u_qinit(&td->io_u_all, td->o.iodepth, td_offload_overlap(td)); 1256 1257 if (err) { 1258 log_err("fio: failed setting up IO queues\n"); 1259 return 1; 1260 } 1261 1262 cl_align = os_cache_line_size(); 1263 1264 for (i = 0; i < max_units; i++) { 1265 void *ptr; 1266 1267 if (td->terminate) 1268 return 1; 1269 1270 ptr = fio_memalign(cl_align, sizeof(*io_u), td_offload_overlap(td)); 1271 if (!ptr) { 1272 log_err("fio: unable to allocate aligned memory\n"); 1273 return 1; 1274 } 1275 1276 io_u = ptr; 1277 memset(io_u, 0, sizeof(*io_u)); 1278 INIT_FLIST_HEAD(&io_u->verify_list); 1279 dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i); 1280 1281 io_u->index = i; 1282 io_u->flags = IO_U_F_FREE; 1283 io_u_qpush(&td->io_u_freelist, io_u); 1284 1285 /* 1286 * io_u never leaves this stack, used for iteration of all 1287 * io_u buffers. 1288 */ 1289 io_u_qpush(&td->io_u_all, io_u); 1290 1291 if (td->io_ops->io_u_init) { 1292 int ret = td->io_ops->io_u_init(td, io_u); 1293 1294 if (ret) { 1295 log_err("fio: failed to init engine data: %d\n", ret); 1296 return 1; 1297 } 1298 } 1299 } 1300 1301 init_io_u_buffers(td); 1302 1303 if (init_file_completion_logging(td, max_units)) 1304 return 1; 1305 1306 return 0; 1307 } 1308 1309 int init_io_u_buffers(struct thread_data *td) 1310 { 1311 struct io_u *io_u; 1312 unsigned long long max_bs, min_write; 1313 int i, max_units; 1314 int data_xfer = 1; 1315 char *p; 1316 1317 max_units = td->o.iodepth; 1318 max_bs = td_max_bs(td); 1319 min_write = td->o.min_bs[DDIR_WRITE]; 1320 td->orig_buffer_size = (unsigned long long) max_bs 1321 * (unsigned long long) max_units; 1322 1323 if (td_ioengine_flagged(td, FIO_NOIO) || !(td_read(td) || td_write(td))) 1324 data_xfer = 0; 1325 1326 /* 1327 * if we may later need to do address alignment, then add any 1328 * possible adjustment here so that we don't cause a buffer 1329 * overflow later. this adjustment may be too much if we get 1330 * lucky and the allocator gives us an aligned address. 1331 */ 1332 if (td->o.odirect || td->o.mem_align || td->o.oatomic || 1333 td_ioengine_flagged(td, FIO_RAWIO)) 1334 td->orig_buffer_size += page_mask + td->o.mem_align; 1335 1336 if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) { 1337 unsigned long long bs; 1338 1339 bs = td->orig_buffer_size + td->o.hugepage_size - 1; 1340 td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1); 1341 } 1342 1343 if (td->orig_buffer_size != (size_t) td->orig_buffer_size) { 1344 log_err("fio: IO memory too large. Reduce max_bs or iodepth\n"); 1345 return 1; 1346 } 1347 1348 if (data_xfer && allocate_io_mem(td)) 1349 return 1; 1350 1351 if (td->o.odirect || td->o.mem_align || td->o.oatomic || 1352 td_ioengine_flagged(td, FIO_RAWIO)) 1353 p = PTR_ALIGN(td->orig_buffer, page_mask) + td->o.mem_align; 1354 else 1355 p = td->orig_buffer; 1356 1357 for (i = 0; i < max_units; i++) { 1358 io_u = td->io_u_all.io_us[i]; 1359 dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i); 1360 1361 if (data_xfer) { 1362 io_u->buf = p; 1363 dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf); 1364 1365 if (td_write(td)) 1366 io_u_fill_buffer(td, io_u, min_write, max_bs); 1367 if (td_write(td) && td->o.verify_pattern_bytes) { 1368 /* 1369 * Fill the buffer with the pattern if we are 1370 * going to be doing writes. 1371 */ 1372 fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0); 1373 } 1374 } 1375 p += max_bs; 1376 } 1377 1378 return 0; 1379 } 1380 1381 #ifdef FIO_HAVE_IOSCHED_SWITCH 1382 /* 1383 * These functions are Linux specific. 1384 * FIO_HAVE_IOSCHED_SWITCH enabled currently means it's Linux. 1385 */ 1386 static int set_ioscheduler(struct thread_data *td, struct fio_file *file) 1387 { 1388 char tmp[256], tmp2[128], *p; 1389 FILE *f; 1390 int ret; 1391 1392 assert(file->du && file->du->sysfs_root); 1393 sprintf(tmp, "%s/queue/scheduler", file->du->sysfs_root); 1394 1395 f = fopen(tmp, "r+"); 1396 if (!f) { 1397 if (errno == ENOENT) { 1398 log_err("fio: os or kernel doesn't support IO scheduler" 1399 " switching\n"); 1400 return 0; 1401 } 1402 td_verror(td, errno, "fopen iosched"); 1403 return 1; 1404 } 1405 1406 /* 1407 * Set io scheduler. 1408 */ 1409 ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f); 1410 if (ferror(f) || ret != 1) { 1411 td_verror(td, errno, "fwrite"); 1412 fclose(f); 1413 return 1; 1414 } 1415 1416 rewind(f); 1417 1418 /* 1419 * Read back and check that the selected scheduler is now the default. 1420 */ 1421 ret = fread(tmp, 1, sizeof(tmp) - 1, f); 1422 if (ferror(f) || ret < 0) { 1423 td_verror(td, errno, "fread"); 1424 fclose(f); 1425 return 1; 1426 } 1427 tmp[ret] = '\0'; 1428 /* 1429 * either a list of io schedulers or "none\n" is expected. Strip the 1430 * trailing newline. 1431 */ 1432 p = tmp; 1433 strsep(&p, "\n"); 1434 1435 /* 1436 * Write to "none" entry doesn't fail, so check the result here. 1437 */ 1438 if (!strcmp(tmp, "none")) { 1439 log_err("fio: io scheduler is not tunable\n"); 1440 fclose(f); 1441 return 0; 1442 } 1443 1444 sprintf(tmp2, "[%s]", td->o.ioscheduler); 1445 if (!strstr(tmp, tmp2)) { 1446 log_err("fio: unable to set io scheduler to %s\n", td->o.ioscheduler); 1447 td_verror(td, EINVAL, "iosched_switch"); 1448 fclose(f); 1449 return 1; 1450 } 1451 1452 fclose(f); 1453 return 0; 1454 } 1455 1456 static int switch_ioscheduler(struct thread_data *td) 1457 { 1458 struct fio_file *f; 1459 unsigned int i; 1460 int ret = 0; 1461 1462 if (td_ioengine_flagged(td, FIO_DISKLESSIO)) 1463 return 0; 1464 1465 assert(td->files && td->files[0]); 1466 1467 for_each_file(td, f, i) { 1468 1469 /* Only consider regular files and block device files */ 1470 switch (f->filetype) { 1471 case FIO_TYPE_FILE: 1472 case FIO_TYPE_BLOCK: 1473 /* 1474 * Make sure that the device hosting the file could 1475 * be determined. 1476 */ 1477 if (!f->du) 1478 continue; 1479 break; 1480 case FIO_TYPE_CHAR: 1481 case FIO_TYPE_PIPE: 1482 default: 1483 continue; 1484 } 1485 1486 ret = set_ioscheduler(td, f); 1487 if (ret) 1488 return ret; 1489 } 1490 1491 return 0; 1492 } 1493 1494 #else 1495 1496 static int switch_ioscheduler(struct thread_data *td) 1497 { 1498 return 0; 1499 } 1500 1501 #endif /* FIO_HAVE_IOSCHED_SWITCH */ 1502 1503 static bool keep_running(struct thread_data *td) 1504 { 1505 unsigned long long limit; 1506 1507 if (td->done) 1508 return false; 1509 if (td->terminate) 1510 return false; 1511 if (td->o.time_based) 1512 return true; 1513 if (td->o.loops) { 1514 td->o.loops--; 1515 return true; 1516 } 1517 if (exceeds_number_ios(td)) 1518 return false; 1519 1520 if (td->o.io_size) 1521 limit = td->o.io_size; 1522 else 1523 limit = td->o.size; 1524 1525 if (limit != -1ULL && ddir_rw_sum(td->io_bytes) < limit) { 1526 uint64_t diff; 1527 1528 /* 1529 * If the difference is less than the maximum IO size, we 1530 * are done. 1531 */ 1532 diff = limit - ddir_rw_sum(td->io_bytes); 1533 if (diff < td_max_bs(td)) 1534 return false; 1535 1536 if (fio_files_done(td) && !td->o.io_size) 1537 return false; 1538 1539 return true; 1540 } 1541 1542 return false; 1543 } 1544 1545 static int exec_string(struct thread_options *o, const char *string, 1546 const char *mode) 1547 { 1548 int ret; 1549 char *str; 1550 1551 if (asprintf(&str, "%s > %s.%s.txt 2>&1", string, o->name, mode) < 0) 1552 return -1; 1553 1554 log_info("%s : Saving output of %s in %s.%s.txt\n", o->name, mode, 1555 o->name, mode); 1556 ret = system(str); 1557 if (ret == -1) 1558 log_err("fio: exec of cmd <%s> failed\n", str); 1559 1560 free(str); 1561 return ret; 1562 } 1563 1564 /* 1565 * Dry run to compute correct state of numberio for verification. 1566 */ 1567 static uint64_t do_dry_run(struct thread_data *td) 1568 { 1569 td_set_runstate(td, TD_RUNNING); 1570 1571 while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || 1572 (!flist_empty(&td->trim_list)) || !io_complete_bytes_exceeded(td)) { 1573 struct io_u *io_u; 1574 int ret; 1575 1576 if (td->terminate || td->done) 1577 break; 1578 1579 io_u = get_io_u(td); 1580 if (IS_ERR_OR_NULL(io_u)) 1581 break; 1582 1583 io_u_set(td, io_u, IO_U_F_FLIGHT); 1584 io_u->error = 0; 1585 io_u->resid = 0; 1586 if (ddir_rw(acct_ddir(io_u))) 1587 td->io_issues[acct_ddir(io_u)]++; 1588 if (ddir_rw(io_u->ddir)) { 1589 io_u_mark_depth(td, 1); 1590 td->ts.total_io_u[io_u->ddir]++; 1591 } 1592 1593 if (td_write(td) && io_u->ddir == DDIR_WRITE && 1594 td->o.do_verify && 1595 td->o.verify != VERIFY_NONE && 1596 !td->o.experimental_verify) 1597 log_io_piece(td, io_u); 1598 1599 ret = io_u_sync_complete(td, io_u); 1600 (void) ret; 1601 } 1602 1603 return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM]; 1604 } 1605 1606 struct fork_data { 1607 struct thread_data *td; 1608 struct sk_out *sk_out; 1609 }; 1610 1611 /* 1612 * Entry point for the thread based jobs. The process based jobs end up 1613 * here as well, after a little setup. 1614 */ 1615 static void *thread_main(void *data) 1616 { 1617 struct fork_data *fd = data; 1618 unsigned long long elapsed_us[DDIR_RWDIR_CNT] = { 0, }; 1619 struct thread_data *td = fd->td; 1620 struct thread_options *o = &td->o; 1621 struct sk_out *sk_out = fd->sk_out; 1622 uint64_t bytes_done[DDIR_RWDIR_CNT]; 1623 int deadlock_loop_cnt; 1624 bool clear_state; 1625 int res, ret; 1626 1627 sk_out_assign(sk_out); 1628 free(fd); 1629 1630 if (!o->use_thread) { 1631 setsid(); 1632 td->pid = getpid(); 1633 } else 1634 td->pid = gettid(); 1635 1636 fio_local_clock_init(); 1637 1638 dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid); 1639 1640 if (is_backend) 1641 fio_server_send_start(td); 1642 1643 INIT_FLIST_HEAD(&td->io_log_list); 1644 INIT_FLIST_HEAD(&td->io_hist_list); 1645 INIT_FLIST_HEAD(&td->verify_list); 1646 INIT_FLIST_HEAD(&td->trim_list); 1647 td->io_hist_tree = RB_ROOT; 1648 1649 ret = mutex_cond_init_pshared(&td->io_u_lock, &td->free_cond); 1650 if (ret) { 1651 td_verror(td, ret, "mutex_cond_init_pshared"); 1652 goto err; 1653 } 1654 ret = cond_init_pshared(&td->verify_cond); 1655 if (ret) { 1656 td_verror(td, ret, "mutex_cond_pshared"); 1657 goto err; 1658 } 1659 1660 td_set_runstate(td, TD_INITIALIZED); 1661 dprint(FD_MUTEX, "up startup_sem\n"); 1662 fio_sem_up(startup_sem); 1663 dprint(FD_MUTEX, "wait on td->sem\n"); 1664 fio_sem_down(td->sem); 1665 dprint(FD_MUTEX, "done waiting on td->sem\n"); 1666 1667 /* 1668 * A new gid requires privilege, so we need to do this before setting 1669 * the uid. 1670 */ 1671 if (o->gid != -1U && setgid(o->gid)) { 1672 td_verror(td, errno, "setgid"); 1673 goto err; 1674 } 1675 if (o->uid != -1U && setuid(o->uid)) { 1676 td_verror(td, errno, "setuid"); 1677 goto err; 1678 } 1679 1680 td_zone_gen_index(td); 1681 1682 /* 1683 * Do this early, we don't want the compress threads to be limited 1684 * to the same CPUs as the IO workers. So do this before we set 1685 * any potential CPU affinity 1686 */ 1687 if (iolog_compress_init(td, sk_out)) 1688 goto err; 1689 1690 /* 1691 * If we have a gettimeofday() thread, make sure we exclude that 1692 * thread from this job 1693 */ 1694 if (o->gtod_cpu) 1695 fio_cpu_clear(&o->cpumask, o->gtod_cpu); 1696 1697 /* 1698 * Set affinity first, in case it has an impact on the memory 1699 * allocations. 1700 */ 1701 if (fio_option_is_set(o, cpumask)) { 1702 if (o->cpus_allowed_policy == FIO_CPUS_SPLIT) { 1703 ret = fio_cpus_split(&o->cpumask, td->thread_number - 1); 1704 if (!ret) { 1705 log_err("fio: no CPUs set\n"); 1706 log_err("fio: Try increasing number of available CPUs\n"); 1707 td_verror(td, EINVAL, "cpus_split"); 1708 goto err; 1709 } 1710 } 1711 ret = fio_setaffinity(td->pid, o->cpumask); 1712 if (ret == -1) { 1713 td_verror(td, errno, "cpu_set_affinity"); 1714 goto err; 1715 } 1716 } 1717 1718 #ifdef CONFIG_LIBNUMA 1719 /* numa node setup */ 1720 if (fio_option_is_set(o, numa_cpunodes) || 1721 fio_option_is_set(o, numa_memnodes)) { 1722 struct bitmask *mask; 1723 1724 if (numa_available() < 0) { 1725 td_verror(td, errno, "Does not support NUMA API\n"); 1726 goto err; 1727 } 1728 1729 if (fio_option_is_set(o, numa_cpunodes)) { 1730 mask = numa_parse_nodestring(o->numa_cpunodes); 1731 ret = numa_run_on_node_mask(mask); 1732 numa_free_nodemask(mask); 1733 if (ret == -1) { 1734 td_verror(td, errno, \ 1735 "numa_run_on_node_mask failed\n"); 1736 goto err; 1737 } 1738 } 1739 1740 if (fio_option_is_set(o, numa_memnodes)) { 1741 mask = NULL; 1742 if (o->numa_memnodes) 1743 mask = numa_parse_nodestring(o->numa_memnodes); 1744 1745 switch (o->numa_mem_mode) { 1746 case MPOL_INTERLEAVE: 1747 numa_set_interleave_mask(mask); 1748 break; 1749 case MPOL_BIND: 1750 numa_set_membind(mask); 1751 break; 1752 case MPOL_LOCAL: 1753 numa_set_localalloc(); 1754 break; 1755 case MPOL_PREFERRED: 1756 numa_set_preferred(o->numa_mem_prefer_node); 1757 break; 1758 case MPOL_DEFAULT: 1759 default: 1760 break; 1761 } 1762 1763 if (mask) 1764 numa_free_nodemask(mask); 1765 1766 } 1767 } 1768 #endif 1769 1770 if (fio_pin_memory(td)) 1771 goto err; 1772 1773 /* 1774 * May alter parameters that init_io_u() will use, so we need to 1775 * do this first. 1776 */ 1777 if (!init_iolog(td)) 1778 goto err; 1779 1780 if (td_io_init(td)) 1781 goto err; 1782 1783 if (init_io_u(td)) 1784 goto err; 1785 1786 if (td->io_ops->post_init && td->io_ops->post_init(td)) 1787 goto err; 1788 1789 if (o->verify_async && verify_async_init(td)) 1790 goto err; 1791 1792 if (fio_option_is_set(o, ioprio) || 1793 fio_option_is_set(o, ioprio_class)) { 1794 ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio); 1795 if (ret == -1) { 1796 td_verror(td, errno, "ioprio_set"); 1797 goto err; 1798 } 1799 td->ioprio = ioprio_value(o->ioprio_class, o->ioprio); 1800 } 1801 1802 if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt)) 1803 goto err; 1804 1805 errno = 0; 1806 if (nice(o->nice) == -1 && errno != 0) { 1807 td_verror(td, errno, "nice"); 1808 goto err; 1809 } 1810 1811 if (o->ioscheduler && switch_ioscheduler(td)) 1812 goto err; 1813 1814 if (!o->create_serialize && setup_files(td)) 1815 goto err; 1816 1817 if (!init_random_map(td)) 1818 goto err; 1819 1820 if (o->exec_prerun && exec_string(o, o->exec_prerun, "prerun")) 1821 goto err; 1822 1823 if (o->pre_read && !pre_read_files(td)) 1824 goto err; 1825 1826 fio_verify_init(td); 1827 1828 if (rate_submit_init(td, sk_out)) 1829 goto err; 1830 1831 set_epoch_time(td, o->log_unix_epoch); 1832 fio_getrusage(&td->ru_start); 1833 memcpy(&td->bw_sample_time, &td->epoch, sizeof(td->epoch)); 1834 memcpy(&td->iops_sample_time, &td->epoch, sizeof(td->epoch)); 1835 memcpy(&td->ss.prev_time, &td->epoch, sizeof(td->epoch)); 1836 1837 init_thinktime(td); 1838 1839 if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] || 1840 o->ratemin[DDIR_TRIM]) { 1841 memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time, 1842 sizeof(td->bw_sample_time)); 1843 memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time, 1844 sizeof(td->bw_sample_time)); 1845 memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time, 1846 sizeof(td->bw_sample_time)); 1847 } 1848 1849 memset(bytes_done, 0, sizeof(bytes_done)); 1850 clear_state = false; 1851 1852 while (keep_running(td)) { 1853 uint64_t verify_bytes; 1854 1855 fio_gettime(&td->start, NULL); 1856 memcpy(&td->ts_cache, &td->start, sizeof(td->start)); 1857 1858 if (clear_state) { 1859 clear_io_state(td, 0); 1860 1861 if (o->unlink_each_loop && unlink_all_files(td)) 1862 break; 1863 } 1864 1865 prune_io_piece_log(td); 1866 1867 if (td->o.verify_only && td_write(td)) 1868 verify_bytes = do_dry_run(td); 1869 else { 1870 do_io(td, bytes_done); 1871 1872 if (!ddir_rw_sum(bytes_done)) { 1873 fio_mark_td_terminate(td); 1874 verify_bytes = 0; 1875 } else { 1876 verify_bytes = bytes_done[DDIR_WRITE] + 1877 bytes_done[DDIR_TRIM]; 1878 } 1879 } 1880 1881 /* 1882 * If we took too long to shut down, the main thread could 1883 * already consider us reaped/exited. If that happens, break 1884 * out and clean up. 1885 */ 1886 if (td->runstate >= TD_EXITED) 1887 break; 1888 1889 clear_state = true; 1890 1891 /* 1892 * Make sure we've successfully updated the rusage stats 1893 * before waiting on the stat mutex. Otherwise we could have 1894 * the stat thread holding stat mutex and waiting for 1895 * the rusage_sem, which would never get upped because 1896 * this thread is waiting for the stat mutex. 1897 */ 1898 deadlock_loop_cnt = 0; 1899 do { 1900 check_update_rusage(td); 1901 if (!fio_sem_down_trylock(stat_sem)) 1902 break; 1903 usleep(1000); 1904 if (deadlock_loop_cnt++ > 5000) { 1905 log_err("fio seems to be stuck grabbing stat_sem, forcibly exiting\n"); 1906 td->error = EDEADLK; 1907 goto err; 1908 } 1909 } while (1); 1910 1911 if (td_read(td) && td->io_bytes[DDIR_READ]) 1912 update_runtime(td, elapsed_us, DDIR_READ); 1913 if (td_write(td) && td->io_bytes[DDIR_WRITE]) 1914 update_runtime(td, elapsed_us, DDIR_WRITE); 1915 if (td_trim(td) && td->io_bytes[DDIR_TRIM]) 1916 update_runtime(td, elapsed_us, DDIR_TRIM); 1917 fio_gettime(&td->start, NULL); 1918 fio_sem_up(stat_sem); 1919 1920 if (td->error || td->terminate) 1921 break; 1922 1923 if (!o->do_verify || 1924 o->verify == VERIFY_NONE || 1925 td_ioengine_flagged(td, FIO_UNIDIR)) 1926 continue; 1927 1928 clear_io_state(td, 0); 1929 1930 fio_gettime(&td->start, NULL); 1931 1932 do_verify(td, verify_bytes); 1933 1934 /* 1935 * See comment further up for why this is done here. 1936 */ 1937 check_update_rusage(td); 1938 1939 fio_sem_down(stat_sem); 1940 update_runtime(td, elapsed_us, DDIR_READ); 1941 fio_gettime(&td->start, NULL); 1942 fio_sem_up(stat_sem); 1943 1944 if (td->error || td->terminate) 1945 break; 1946 } 1947 1948 /* 1949 * Acquire this lock if we were doing overlap checking in 1950 * offload mode so that we don't clean up this job while 1951 * another thread is checking its io_u's for overlap 1952 */ 1953 if (td_offload_overlap(td)) { 1954 int res = pthread_mutex_lock(&overlap_check); 1955 assert(res == 0); 1956 } 1957 td_set_runstate(td, TD_FINISHING); 1958 if (td_offload_overlap(td)) { 1959 res = pthread_mutex_unlock(&overlap_check); 1960 assert(res == 0); 1961 } 1962 1963 update_rusage_stat(td); 1964 td->ts.total_run_time = mtime_since_now(&td->epoch); 1965 for_each_rw_ddir(ddir) { 1966 td->ts.io_bytes[ddir] = td->io_bytes[ddir]; 1967 } 1968 1969 if (td->o.verify_state_save && !(td->flags & TD_F_VSTATE_SAVED) && 1970 (td->o.verify != VERIFY_NONE && td_write(td))) 1971 verify_save_state(td->thread_number); 1972 1973 fio_unpin_memory(td); 1974 1975 td_writeout_logs(td, true); 1976 1977 iolog_compress_exit(td); 1978 rate_submit_exit(td); 1979 1980 if (o->exec_postrun) 1981 exec_string(o, o->exec_postrun, "postrun"); 1982 1983 if (exitall_on_terminate || (o->exitall_error && td->error)) 1984 fio_terminate_threads(td->groupid, td->o.exit_what); 1985 1986 err: 1987 if (td->error) 1988 log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error, 1989 td->verror); 1990 1991 if (o->verify_async) 1992 verify_async_exit(td); 1993 1994 close_and_free_files(td); 1995 cleanup_io_u(td); 1996 close_ioengine(td); 1997 cgroup_shutdown(td, cgroup_mnt); 1998 verify_free_state(td); 1999 td_zone_free_index(td); 2000 2001 if (fio_option_is_set(o, cpumask)) { 2002 ret = fio_cpuset_exit(&o->cpumask); 2003 if (ret) 2004 td_verror(td, ret, "fio_cpuset_exit"); 2005 } 2006 2007 /* 2008 * do this very late, it will log file closing as well 2009 */ 2010 if (o->write_iolog_file) 2011 write_iolog_close(td); 2012 if (td->io_log_rfile) 2013 fclose(td->io_log_rfile); 2014 2015 td_set_runstate(td, TD_EXITED); 2016 2017 /* 2018 * Do this last after setting our runstate to exited, so we 2019 * know that the stat thread is signaled. 2020 */ 2021 check_update_rusage(td); 2022 2023 sk_out_drop(); 2024 return (void *) (uintptr_t) td->error; 2025 } 2026 2027 /* 2028 * Run over the job map and reap the threads that have exited, if any. 2029 */ 2030 static void reap_threads(unsigned int *nr_running, uint64_t *t_rate, 2031 uint64_t *m_rate) 2032 { 2033 struct thread_data *td; 2034 unsigned int cputhreads, realthreads, pending; 2035 int i, status, ret; 2036 2037 /* 2038 * reap exited threads (TD_EXITED -> TD_REAPED) 2039 */ 2040 realthreads = pending = cputhreads = 0; 2041 for_each_td(td, i) { 2042 int flags = 0; 2043 2044 if (!strcmp(td->o.ioengine, "cpuio")) 2045 cputhreads++; 2046 else 2047 realthreads++; 2048 2049 if (!td->pid) { 2050 pending++; 2051 continue; 2052 } 2053 if (td->runstate == TD_REAPED) 2054 continue; 2055 if (td->o.use_thread) { 2056 if (td->runstate == TD_EXITED) { 2057 td_set_runstate(td, TD_REAPED); 2058 goto reaped; 2059 } 2060 continue; 2061 } 2062 2063 flags = WNOHANG; 2064 if (td->runstate == TD_EXITED) 2065 flags = 0; 2066 2067 /* 2068 * check if someone quit or got killed in an unusual way 2069 */ 2070 ret = waitpid(td->pid, &status, flags); 2071 if (ret < 0) { 2072 if (errno == ECHILD) { 2073 log_err("fio: pid=%d disappeared %d\n", 2074 (int) td->pid, td->runstate); 2075 td->sig = ECHILD; 2076 td_set_runstate(td, TD_REAPED); 2077 goto reaped; 2078 } 2079 perror("waitpid"); 2080 } else if (ret == td->pid) { 2081 if (WIFSIGNALED(status)) { 2082 int sig = WTERMSIG(status); 2083 2084 if (sig != SIGTERM && sig != SIGUSR2) 2085 log_err("fio: pid=%d, got signal=%d\n", 2086 (int) td->pid, sig); 2087 td->sig = sig; 2088 td_set_runstate(td, TD_REAPED); 2089 goto reaped; 2090 } 2091 if (WIFEXITED(status)) { 2092 if (WEXITSTATUS(status) && !td->error) 2093 td->error = WEXITSTATUS(status); 2094 2095 td_set_runstate(td, TD_REAPED); 2096 goto reaped; 2097 } 2098 } 2099 2100 /* 2101 * If the job is stuck, do a forceful timeout of it and 2102 * move on. 2103 */ 2104 if (td->terminate && 2105 td->runstate < TD_FSYNCING && 2106 time_since_now(&td->terminate_time) >= FIO_REAP_TIMEOUT) { 2107 log_err("fio: job '%s' (state=%d) hasn't exited in " 2108 "%lu seconds, it appears to be stuck. Doing " 2109 "forceful exit of this job.\n", 2110 td->o.name, td->runstate, 2111 (unsigned long) time_since_now(&td->terminate_time)); 2112 td_set_runstate(td, TD_REAPED); 2113 goto reaped; 2114 } 2115 2116 /* 2117 * thread is not dead, continue 2118 */ 2119 pending++; 2120 continue; 2121 reaped: 2122 (*nr_running)--; 2123 (*m_rate) -= ddir_rw_sum(td->o.ratemin); 2124 (*t_rate) -= ddir_rw_sum(td->o.rate); 2125 if (!td->pid) 2126 pending--; 2127 2128 if (td->error) 2129 exit_value++; 2130 2131 done_secs += mtime_since_now(&td->epoch) / 1000; 2132 profile_td_exit(td); 2133 flow_exit_job(td); 2134 } 2135 2136 if (*nr_running == cputhreads && !pending && realthreads) 2137 fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); 2138 } 2139 2140 static bool __check_trigger_file(void) 2141 { 2142 struct stat sb; 2143 2144 if (!trigger_file) 2145 return false; 2146 2147 if (stat(trigger_file, &sb)) 2148 return false; 2149 2150 if (unlink(trigger_file) < 0) 2151 log_err("fio: failed to unlink %s: %s\n", trigger_file, 2152 strerror(errno)); 2153 2154 return true; 2155 } 2156 2157 static bool trigger_timedout(void) 2158 { 2159 if (trigger_timeout) 2160 if (time_since_genesis() >= trigger_timeout) { 2161 trigger_timeout = 0; 2162 return true; 2163 } 2164 2165 return false; 2166 } 2167 2168 void exec_trigger(const char *cmd) 2169 { 2170 int ret; 2171 2172 if (!cmd || cmd[0] == '\0') 2173 return; 2174 2175 ret = system(cmd); 2176 if (ret == -1) 2177 log_err("fio: failed executing %s trigger\n", cmd); 2178 } 2179 2180 void check_trigger_file(void) 2181 { 2182 if (__check_trigger_file() || trigger_timedout()) { 2183 if (nr_clients) 2184 fio_clients_send_trigger(trigger_remote_cmd); 2185 else { 2186 verify_save_state(IO_LIST_ALL); 2187 fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); 2188 exec_trigger(trigger_cmd); 2189 } 2190 } 2191 } 2192 2193 static int fio_verify_load_state(struct thread_data *td) 2194 { 2195 int ret; 2196 2197 if (!td->o.verify_state) 2198 return 0; 2199 2200 if (is_backend) { 2201 void *data; 2202 2203 ret = fio_server_get_verify_state(td->o.name, 2204 td->thread_number - 1, &data); 2205 if (!ret) 2206 verify_assign_state(td, data); 2207 } else { 2208 char prefix[PATH_MAX]; 2209 2210 if (aux_path) 2211 sprintf(prefix, "%s%clocal", aux_path, 2212 FIO_OS_PATH_SEPARATOR); 2213 else 2214 strcpy(prefix, "local"); 2215 ret = verify_load_state(td, prefix); 2216 } 2217 2218 return ret; 2219 } 2220 2221 static void do_usleep(unsigned int usecs) 2222 { 2223 check_for_running_stats(); 2224 check_trigger_file(); 2225 usleep(usecs); 2226 } 2227 2228 static bool check_mount_writes(struct thread_data *td) 2229 { 2230 struct fio_file *f; 2231 unsigned int i; 2232 2233 if (!td_write(td) || td->o.allow_mounted_write) 2234 return false; 2235 2236 /* 2237 * If FIO_HAVE_CHARDEV_SIZE is defined, it's likely that chrdevs 2238 * are mkfs'd and mounted. 2239 */ 2240 for_each_file(td, f, i) { 2241 #ifdef FIO_HAVE_CHARDEV_SIZE 2242 if (f->filetype != FIO_TYPE_BLOCK && f->filetype != FIO_TYPE_CHAR) 2243 #else 2244 if (f->filetype != FIO_TYPE_BLOCK) 2245 #endif 2246 continue; 2247 if (device_is_mounted(f->file_name)) 2248 goto mounted; 2249 } 2250 2251 return false; 2252 mounted: 2253 log_err("fio: %s appears mounted, and 'allow_mounted_write' isn't set. Aborting.\n", f->file_name); 2254 return true; 2255 } 2256 2257 static bool waitee_running(struct thread_data *me) 2258 { 2259 const char *waitee = me->o.wait_for; 2260 const char *self = me->o.name; 2261 struct thread_data *td; 2262 int i; 2263 2264 if (!waitee) 2265 return false; 2266 2267 for_each_td(td, i) { 2268 if (!strcmp(td->o.name, self) || strcmp(td->o.name, waitee)) 2269 continue; 2270 2271 if (td->runstate < TD_EXITED) { 2272 dprint(FD_PROCESS, "%s fenced by %s(%s)\n", 2273 self, td->o.name, 2274 runstate_to_name(td->runstate)); 2275 return true; 2276 } 2277 } 2278 2279 dprint(FD_PROCESS, "%s: %s completed, can run\n", self, waitee); 2280 return false; 2281 } 2282 2283 /* 2284 * Main function for kicking off and reaping jobs, as needed. 2285 */ 2286 static void run_threads(struct sk_out *sk_out) 2287 { 2288 struct thread_data *td; 2289 unsigned int i, todo, nr_running, nr_started; 2290 uint64_t m_rate, t_rate; 2291 uint64_t spent; 2292 2293 if (fio_gtod_offload && fio_start_gtod_thread()) 2294 return; 2295 2296 fio_idle_prof_init(); 2297 2298 set_sig_handlers(); 2299 2300 nr_thread = nr_process = 0; 2301 for_each_td(td, i) { 2302 if (check_mount_writes(td)) 2303 return; 2304 if (td->o.use_thread) 2305 nr_thread++; 2306 else 2307 nr_process++; 2308 } 2309 2310 if (output_format & FIO_OUTPUT_NORMAL) { 2311 struct buf_output out; 2312 2313 buf_output_init(&out); 2314 __log_buf(&out, "Starting "); 2315 if (nr_thread) 2316 __log_buf(&out, "%d thread%s", nr_thread, 2317 nr_thread > 1 ? "s" : ""); 2318 if (nr_process) { 2319 if (nr_thread) 2320 __log_buf(&out, " and "); 2321 __log_buf(&out, "%d process%s", nr_process, 2322 nr_process > 1 ? "es" : ""); 2323 } 2324 __log_buf(&out, "\n"); 2325 log_info_buf(out.buf, out.buflen); 2326 buf_output_free(&out); 2327 } 2328 2329 todo = thread_number; 2330 nr_running = 0; 2331 nr_started = 0; 2332 m_rate = t_rate = 0; 2333 2334 for_each_td(td, i) { 2335 print_status_init(td->thread_number - 1); 2336 2337 if (!td->o.create_serialize) 2338 continue; 2339 2340 if (fio_verify_load_state(td)) 2341 goto reap; 2342 2343 /* 2344 * do file setup here so it happens sequentially, 2345 * we don't want X number of threads getting their 2346 * client data interspersed on disk 2347 */ 2348 if (setup_files(td)) { 2349 reap: 2350 exit_value++; 2351 if (td->error) 2352 log_err("fio: pid=%d, err=%d/%s\n", 2353 (int) td->pid, td->error, td->verror); 2354 td_set_runstate(td, TD_REAPED); 2355 todo--; 2356 } else { 2357 struct fio_file *f; 2358 unsigned int j; 2359 2360 /* 2361 * for sharing to work, each job must always open 2362 * its own files. so close them, if we opened them 2363 * for creation 2364 */ 2365 for_each_file(td, f, j) { 2366 if (fio_file_open(f)) 2367 td_io_close_file(td, f); 2368 } 2369 } 2370 } 2371 2372 /* start idle threads before io threads start to run */ 2373 fio_idle_prof_start(); 2374 2375 set_genesis_time(); 2376 2377 while (todo) { 2378 struct thread_data *map[REAL_MAX_JOBS]; 2379 struct timespec this_start; 2380 int this_jobs = 0, left; 2381 struct fork_data *fd; 2382 2383 /* 2384 * create threads (TD_NOT_CREATED -> TD_CREATED) 2385 */ 2386 for_each_td(td, i) { 2387 if (td->runstate != TD_NOT_CREATED) 2388 continue; 2389 2390 /* 2391 * never got a chance to start, killed by other 2392 * thread for some reason 2393 */ 2394 if (td->terminate) { 2395 todo--; 2396 continue; 2397 } 2398 2399 if (td->o.start_delay) { 2400 spent = utime_since_genesis(); 2401 2402 if (td->o.start_delay > spent) 2403 continue; 2404 } 2405 2406 if (td->o.stonewall && (nr_started || nr_running)) { 2407 dprint(FD_PROCESS, "%s: stonewall wait\n", 2408 td->o.name); 2409 break; 2410 } 2411 2412 if (waitee_running(td)) { 2413 dprint(FD_PROCESS, "%s: waiting for %s\n", 2414 td->o.name, td->o.wait_for); 2415 continue; 2416 } 2417 2418 init_disk_util(td); 2419 2420 td->rusage_sem = fio_sem_init(FIO_SEM_LOCKED); 2421 td->update_rusage = 0; 2422 2423 /* 2424 * Set state to created. Thread will transition 2425 * to TD_INITIALIZED when it's done setting up. 2426 */ 2427 td_set_runstate(td, TD_CREATED); 2428 map[this_jobs++] = td; 2429 nr_started++; 2430 2431 fd = calloc(1, sizeof(*fd)); 2432 fd->td = td; 2433 fd->sk_out = sk_out; 2434 2435 if (td->o.use_thread) { 2436 int ret; 2437 2438 dprint(FD_PROCESS, "will pthread_create\n"); 2439 ret = pthread_create(&td->thread, NULL, 2440 thread_main, fd); 2441 if (ret) { 2442 log_err("pthread_create: %s\n", 2443 strerror(ret)); 2444 free(fd); 2445 nr_started--; 2446 break; 2447 } 2448 fd = NULL; 2449 ret = pthread_detach(td->thread); 2450 if (ret) 2451 log_err("pthread_detach: %s", 2452 strerror(ret)); 2453 } else { 2454 pid_t pid; 2455 dprint(FD_PROCESS, "will fork\n"); 2456 pid = fork(); 2457 if (!pid) { 2458 int ret; 2459 2460 ret = (int)(uintptr_t)thread_main(fd); 2461 _exit(ret); 2462 } else if (i == fio_debug_jobno) 2463 *fio_debug_jobp = pid; 2464 } 2465 dprint(FD_MUTEX, "wait on startup_sem\n"); 2466 if (fio_sem_down_timeout(startup_sem, 10000)) { 2467 log_err("fio: job startup hung? exiting.\n"); 2468 fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); 2469 fio_abort = true; 2470 nr_started--; 2471 free(fd); 2472 break; 2473 } 2474 dprint(FD_MUTEX, "done waiting on startup_sem\n"); 2475 } 2476 2477 /* 2478 * Wait for the started threads to transition to 2479 * TD_INITIALIZED. 2480 */ 2481 fio_gettime(&this_start, NULL); 2482 left = this_jobs; 2483 while (left && !fio_abort) { 2484 if (mtime_since_now(&this_start) > JOB_START_TIMEOUT) 2485 break; 2486 2487 do_usleep(100000); 2488 2489 for (i = 0; i < this_jobs; i++) { 2490 td = map[i]; 2491 if (!td) 2492 continue; 2493 if (td->runstate == TD_INITIALIZED) { 2494 map[i] = NULL; 2495 left--; 2496 } else if (td->runstate >= TD_EXITED) { 2497 map[i] = NULL; 2498 left--; 2499 todo--; 2500 nr_running++; /* work-around... */ 2501 } 2502 } 2503 } 2504 2505 if (left) { 2506 log_err("fio: %d job%s failed to start\n", left, 2507 left > 1 ? "s" : ""); 2508 for (i = 0; i < this_jobs; i++) { 2509 td = map[i]; 2510 if (!td) 2511 continue; 2512 kill(td->pid, SIGTERM); 2513 } 2514 break; 2515 } 2516 2517 /* 2518 * start created threads (TD_INITIALIZED -> TD_RUNNING). 2519 */ 2520 for_each_td(td, i) { 2521 if (td->runstate != TD_INITIALIZED) 2522 continue; 2523 2524 if (in_ramp_time(td)) 2525 td_set_runstate(td, TD_RAMP); 2526 else 2527 td_set_runstate(td, TD_RUNNING); 2528 nr_running++; 2529 nr_started--; 2530 m_rate += ddir_rw_sum(td->o.ratemin); 2531 t_rate += ddir_rw_sum(td->o.rate); 2532 todo--; 2533 fio_sem_up(td->sem); 2534 } 2535 2536 reap_threads(&nr_running, &t_rate, &m_rate); 2537 2538 if (todo) 2539 do_usleep(100000); 2540 } 2541 2542 while (nr_running) { 2543 reap_threads(&nr_running, &t_rate, &m_rate); 2544 do_usleep(10000); 2545 } 2546 2547 fio_idle_prof_stop(); 2548 2549 update_io_ticks(); 2550 } 2551 2552 static void free_disk_util(void) 2553 { 2554 disk_util_prune_entries(); 2555 helper_thread_destroy(); 2556 } 2557 2558 int fio_backend(struct sk_out *sk_out) 2559 { 2560 struct thread_data *td; 2561 int i; 2562 2563 if (exec_profile) { 2564 if (load_profile(exec_profile)) 2565 return 1; 2566 free(exec_profile); 2567 exec_profile = NULL; 2568 } 2569 if (!thread_number) 2570 return 0; 2571 2572 if (write_bw_log) { 2573 struct log_params p = { 2574 .log_type = IO_LOG_TYPE_BW, 2575 }; 2576 2577 setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log"); 2578 setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log"); 2579 setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log"); 2580 } 2581 2582 startup_sem = fio_sem_init(FIO_SEM_LOCKED); 2583 if (!sk_out) 2584 is_local_backend = true; 2585 if (startup_sem == NULL) 2586 return 1; 2587 2588 set_genesis_time(); 2589 stat_init(); 2590 if (helper_thread_create(startup_sem, sk_out)) 2591 log_err("fio: failed to create helper thread\n"); 2592 2593 cgroup_list = smalloc(sizeof(*cgroup_list)); 2594 if (cgroup_list) 2595 INIT_FLIST_HEAD(cgroup_list); 2596 2597 run_threads(sk_out); 2598 2599 helper_thread_exit(); 2600 2601 if (!fio_abort) { 2602 __show_run_stats(); 2603 if (write_bw_log) { 2604 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 2605 struct io_log *log = agg_io_log[i]; 2606 2607 flush_log(log, false); 2608 free_log(log); 2609 } 2610 } 2611 } 2612 2613 for_each_td(td, i) { 2614 steadystate_free(td); 2615 fio_options_free(td); 2616 fio_dump_options_free(td); 2617 if (td->rusage_sem) { 2618 fio_sem_remove(td->rusage_sem); 2619 td->rusage_sem = NULL; 2620 } 2621 fio_sem_remove(td->sem); 2622 td->sem = NULL; 2623 } 2624 2625 free_disk_util(); 2626 if (cgroup_list) { 2627 cgroup_kill(cgroup_list); 2628 sfree(cgroup_list); 2629 } 2630 2631 fio_sem_remove(startup_sem); 2632 stat_exit(); 2633 return exit_value; 2634 } 2635