1 /* 2 * QEMU System Emulator block driver 3 * 4 * Copyright (c) 2011 IBM Corp. 5 * Copyright (c) 2012 Red Hat, Inc. 6 * 7 * Permission is hereby granted, free of charge, to any person obtaining a copy 8 * of this software and associated documentation files (the "Software"), to deal 9 * in the Software without restriction, including without limitation the rights 10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 * copies of the Software, and to permit persons to whom the Software is 12 * furnished to do so, subject to the following conditions: 13 * 14 * The above copyright notice and this permission notice shall be included in 15 * all copies or substantial portions of the Software. 16 * 17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 23 * THE SOFTWARE. 24 */ 25 26 #include "qemu/osdep.h" 27 #include "qemu-common.h" 28 #include "block/block.h" 29 #include "block/blockjob_int.h" 30 #include "block/block_int.h" 31 #include "block/trace.h" 32 #include "sysemu/block-backend.h" 33 #include "qapi/error.h" 34 #include "qapi/qapi-events-block-core.h" 35 #include "qapi/qmp/qerror.h" 36 #include "qemu/coroutine.h" 37 #include "qemu/timer.h" 38 39 /* Right now, this mutex is only needed to synchronize accesses to job->busy 40 * and job->sleep_timer, such as concurrent calls to block_job_do_yield and 41 * block_job_enter. */ 42 static QemuMutex block_job_mutex; 43 44 static void block_job_lock(void) 45 { 46 qemu_mutex_lock(&block_job_mutex); 47 } 48 49 static void block_job_unlock(void) 50 { 51 qemu_mutex_unlock(&block_job_mutex); 52 } 53 54 static void __attribute__((__constructor__)) block_job_init(void) 55 { 56 qemu_mutex_init(&block_job_mutex); 57 } 58 59 static void block_job_event_cancelled(BlockJob *job); 60 static void block_job_event_completed(BlockJob *job, const char *msg); 61 static int block_job_event_pending(BlockJob *job); 62 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job)); 63 64 /* Transactional group of block jobs */ 65 struct BlockJobTxn { 66 67 /* Is this txn being cancelled? */ 68 bool aborting; 69 70 /* List of jobs */ 71 QLIST_HEAD(, BlockJob) jobs; 72 73 /* Reference count */ 74 int refcnt; 75 }; 76 77 /* 78 * The block job API is composed of two categories of functions. 79 * 80 * The first includes functions used by the monitor. The monitor is 81 * peculiar in that it accesses the block job list with block_job_get, and 82 * therefore needs consistency across block_job_get and the actual operation 83 * (e.g. block_job_set_speed). The consistency is achieved with 84 * aio_context_acquire/release. These functions are declared in blockjob.h. 85 * 86 * The second includes functions used by the block job drivers and sometimes 87 * by the core block layer. These do not care about locking, because the 88 * whole coroutine runs under the AioContext lock, and are declared in 89 * blockjob_int.h. 90 */ 91 92 static bool is_block_job(Job *job) 93 { 94 return job_type(job) == JOB_TYPE_BACKUP || 95 job_type(job) == JOB_TYPE_COMMIT || 96 job_type(job) == JOB_TYPE_MIRROR || 97 job_type(job) == JOB_TYPE_STREAM; 98 } 99 100 BlockJob *block_job_next(BlockJob *bjob) 101 { 102 Job *job = bjob ? &bjob->job : NULL; 103 104 do { 105 job = job_next(job); 106 } while (job && !is_block_job(job)); 107 108 return job ? container_of(job, BlockJob, job) : NULL; 109 } 110 111 BlockJob *block_job_get(const char *id) 112 { 113 Job *job = job_get(id); 114 115 if (job && is_block_job(job)) { 116 return container_of(job, BlockJob, job); 117 } else { 118 return NULL; 119 } 120 } 121 122 BlockJobTxn *block_job_txn_new(void) 123 { 124 BlockJobTxn *txn = g_new0(BlockJobTxn, 1); 125 QLIST_INIT(&txn->jobs); 126 txn->refcnt = 1; 127 return txn; 128 } 129 130 static void block_job_txn_ref(BlockJobTxn *txn) 131 { 132 txn->refcnt++; 133 } 134 135 void block_job_txn_unref(BlockJobTxn *txn) 136 { 137 if (txn && --txn->refcnt == 0) { 138 g_free(txn); 139 } 140 } 141 142 void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job) 143 { 144 if (!txn) { 145 return; 146 } 147 148 assert(!job->txn); 149 job->txn = txn; 150 151 QLIST_INSERT_HEAD(&txn->jobs, job, txn_list); 152 block_job_txn_ref(txn); 153 } 154 155 static void block_job_txn_del_job(BlockJob *job) 156 { 157 if (job->txn) { 158 QLIST_REMOVE(job, txn_list); 159 block_job_txn_unref(job->txn); 160 job->txn = NULL; 161 } 162 } 163 164 /* Assumes the block_job_mutex is held */ 165 static bool block_job_timer_pending(BlockJob *job) 166 { 167 return timer_pending(&job->sleep_timer); 168 } 169 170 /* Assumes the block_job_mutex is held */ 171 static bool block_job_timer_not_pending(BlockJob *job) 172 { 173 return !block_job_timer_pending(job); 174 } 175 176 static void block_job_pause(BlockJob *job) 177 { 178 job->pause_count++; 179 } 180 181 static void block_job_resume(BlockJob *job) 182 { 183 assert(job->pause_count > 0); 184 job->pause_count--; 185 if (job->pause_count) { 186 return; 187 } 188 189 /* kick only if no timer is pending */ 190 block_job_enter_cond(job, block_job_timer_not_pending); 191 } 192 193 static void block_job_attached_aio_context(AioContext *new_context, 194 void *opaque); 195 static void block_job_detach_aio_context(void *opaque); 196 197 void block_job_free(Job *job) 198 { 199 BlockJob *bjob = container_of(job, BlockJob, job); 200 BlockDriverState *bs = blk_bs(bjob->blk); 201 202 assert(!bjob->txn); 203 204 bs->job = NULL; 205 block_job_remove_all_bdrv(bjob); 206 blk_remove_aio_context_notifier(bjob->blk, 207 block_job_attached_aio_context, 208 block_job_detach_aio_context, bjob); 209 blk_unref(bjob->blk); 210 error_free(bjob->blocker); 211 assert(!timer_pending(&bjob->sleep_timer)); 212 } 213 214 static void block_job_attached_aio_context(AioContext *new_context, 215 void *opaque) 216 { 217 BlockJob *job = opaque; 218 219 job->job.aio_context = new_context; 220 if (job->driver->attached_aio_context) { 221 job->driver->attached_aio_context(job, new_context); 222 } 223 224 block_job_resume(job); 225 } 226 227 static void block_job_drain(BlockJob *job) 228 { 229 /* If job is !job->busy this kicks it into the next pause point. */ 230 block_job_enter(job); 231 232 blk_drain(job->blk); 233 if (job->driver->drain) { 234 job->driver->drain(job); 235 } 236 } 237 238 static void block_job_detach_aio_context(void *opaque) 239 { 240 BlockJob *job = opaque; 241 242 /* In case the job terminates during aio_poll()... */ 243 job_ref(&job->job); 244 245 block_job_pause(job); 246 247 while (!job->paused && !job->completed) { 248 block_job_drain(job); 249 } 250 251 job->job.aio_context = NULL; 252 job_unref(&job->job); 253 } 254 255 static char *child_job_get_parent_desc(BdrvChild *c) 256 { 257 BlockJob *job = c->opaque; 258 return g_strdup_printf("%s job '%s'", job_type_str(&job->job), job->job.id); 259 } 260 261 static void child_job_drained_begin(BdrvChild *c) 262 { 263 BlockJob *job = c->opaque; 264 block_job_pause(job); 265 } 266 267 static void child_job_drained_end(BdrvChild *c) 268 { 269 BlockJob *job = c->opaque; 270 block_job_resume(job); 271 } 272 273 static const BdrvChildRole child_job = { 274 .get_parent_desc = child_job_get_parent_desc, 275 .drained_begin = child_job_drained_begin, 276 .drained_end = child_job_drained_end, 277 .stay_at_node = true, 278 }; 279 280 void block_job_remove_all_bdrv(BlockJob *job) 281 { 282 GSList *l; 283 for (l = job->nodes; l; l = l->next) { 284 BdrvChild *c = l->data; 285 bdrv_op_unblock_all(c->bs, job->blocker); 286 bdrv_root_unref_child(c); 287 } 288 g_slist_free(job->nodes); 289 job->nodes = NULL; 290 } 291 292 int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs, 293 uint64_t perm, uint64_t shared_perm, Error **errp) 294 { 295 BdrvChild *c; 296 297 c = bdrv_root_attach_child(bs, name, &child_job, perm, shared_perm, 298 job, errp); 299 if (c == NULL) { 300 return -EPERM; 301 } 302 303 job->nodes = g_slist_prepend(job->nodes, c); 304 bdrv_ref(bs); 305 bdrv_op_block_all(bs, job->blocker); 306 307 return 0; 308 } 309 310 bool block_job_is_internal(BlockJob *job) 311 { 312 return (job->job.id == NULL); 313 } 314 315 static bool block_job_started(BlockJob *job) 316 { 317 return job->co; 318 } 319 320 const BlockJobDriver *block_job_driver(BlockJob *job) 321 { 322 return job->driver; 323 } 324 325 /** 326 * All jobs must allow a pause point before entering their job proper. This 327 * ensures that jobs can be paused prior to being started, then resumed later. 328 */ 329 static void coroutine_fn block_job_co_entry(void *opaque) 330 { 331 BlockJob *job = opaque; 332 333 assert(job && job->driver && job->driver->start); 334 block_job_pause_point(job); 335 job->driver->start(job); 336 } 337 338 static void block_job_sleep_timer_cb(void *opaque) 339 { 340 BlockJob *job = opaque; 341 342 block_job_enter(job); 343 } 344 345 void block_job_start(BlockJob *job) 346 { 347 assert(job && !block_job_started(job) && job->paused && 348 job->driver && job->driver->start); 349 job->co = qemu_coroutine_create(block_job_co_entry, job); 350 job->pause_count--; 351 job->busy = true; 352 job->paused = false; 353 job_state_transition(&job->job, JOB_STATUS_RUNNING); 354 bdrv_coroutine_enter(blk_bs(job->blk), job->co); 355 } 356 357 static void block_job_decommission(BlockJob *job) 358 { 359 assert(job); 360 job->completed = true; 361 job->busy = false; 362 job->paused = false; 363 job->job.deferred_to_main_loop = true; 364 block_job_txn_del_job(job); 365 job_state_transition(&job->job, JOB_STATUS_NULL); 366 job_unref(&job->job); 367 } 368 369 static void block_job_do_dismiss(BlockJob *job) 370 { 371 block_job_decommission(job); 372 } 373 374 static void block_job_conclude(BlockJob *job) 375 { 376 job_state_transition(&job->job, JOB_STATUS_CONCLUDED); 377 if (job->auto_dismiss || !block_job_started(job)) { 378 block_job_do_dismiss(job); 379 } 380 } 381 382 static void block_job_update_rc(BlockJob *job) 383 { 384 if (!job->ret && job_is_cancelled(&job->job)) { 385 job->ret = -ECANCELED; 386 } 387 if (job->ret) { 388 job_state_transition(&job->job, JOB_STATUS_ABORTING); 389 } 390 } 391 392 static int block_job_prepare(BlockJob *job) 393 { 394 if (job->ret == 0 && job->driver->prepare) { 395 job->ret = job->driver->prepare(job); 396 } 397 return job->ret; 398 } 399 400 static void block_job_commit(BlockJob *job) 401 { 402 assert(!job->ret); 403 if (job->driver->commit) { 404 job->driver->commit(job); 405 } 406 } 407 408 static void block_job_abort(BlockJob *job) 409 { 410 assert(job->ret); 411 if (job->driver->abort) { 412 job->driver->abort(job); 413 } 414 } 415 416 static void block_job_clean(BlockJob *job) 417 { 418 if (job->driver->clean) { 419 job->driver->clean(job); 420 } 421 } 422 423 static int block_job_finalize_single(BlockJob *job) 424 { 425 assert(job->completed); 426 427 /* Ensure abort is called for late-transactional failures */ 428 block_job_update_rc(job); 429 430 if (!job->ret) { 431 block_job_commit(job); 432 } else { 433 block_job_abort(job); 434 } 435 block_job_clean(job); 436 437 if (job->cb) { 438 job->cb(job->opaque, job->ret); 439 } 440 441 /* Emit events only if we actually started */ 442 if (block_job_started(job)) { 443 if (job_is_cancelled(&job->job)) { 444 block_job_event_cancelled(job); 445 } else { 446 const char *msg = NULL; 447 if (job->ret < 0) { 448 msg = strerror(-job->ret); 449 } 450 block_job_event_completed(job, msg); 451 } 452 } 453 454 block_job_txn_del_job(job); 455 block_job_conclude(job); 456 return 0; 457 } 458 459 static void block_job_cancel_async(BlockJob *job, bool force) 460 { 461 if (job->iostatus != BLOCK_DEVICE_IO_STATUS_OK) { 462 block_job_iostatus_reset(job); 463 } 464 if (job->user_paused) { 465 /* Do not call block_job_enter here, the caller will handle it. */ 466 job->user_paused = false; 467 job->pause_count--; 468 } 469 job->job.cancelled = true; 470 /* To prevent 'force == false' overriding a previous 'force == true' */ 471 job->force |= force; 472 } 473 474 static int block_job_txn_apply(BlockJobTxn *txn, int fn(BlockJob *), bool lock) 475 { 476 AioContext *ctx; 477 BlockJob *job, *next; 478 int rc = 0; 479 480 QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) { 481 if (lock) { 482 ctx = blk_get_aio_context(job->blk); 483 aio_context_acquire(ctx); 484 } 485 rc = fn(job); 486 if (lock) { 487 aio_context_release(ctx); 488 } 489 if (rc) { 490 break; 491 } 492 } 493 return rc; 494 } 495 496 static int block_job_finish_sync(BlockJob *job, 497 void (*finish)(BlockJob *, Error **errp), 498 Error **errp) 499 { 500 Error *local_err = NULL; 501 int ret; 502 503 assert(blk_bs(job->blk)->job == job); 504 505 job_ref(&job->job); 506 507 if (finish) { 508 finish(job, &local_err); 509 } 510 if (local_err) { 511 error_propagate(errp, local_err); 512 job_unref(&job->job); 513 return -EBUSY; 514 } 515 /* block_job_drain calls block_job_enter, and it should be enough to 516 * induce progress until the job completes or moves to the main thread. 517 */ 518 while (!job->job.deferred_to_main_loop && !job->completed) { 519 block_job_drain(job); 520 } 521 while (!job->completed) { 522 aio_poll(qemu_get_aio_context(), true); 523 } 524 ret = (job_is_cancelled(&job->job) && job->ret == 0) 525 ? -ECANCELED : job->ret; 526 job_unref(&job->job); 527 return ret; 528 } 529 530 static void block_job_completed_txn_abort(BlockJob *job) 531 { 532 AioContext *ctx; 533 BlockJobTxn *txn = job->txn; 534 BlockJob *other_job; 535 536 if (txn->aborting) { 537 /* 538 * We are cancelled by another job, which will handle everything. 539 */ 540 return; 541 } 542 txn->aborting = true; 543 block_job_txn_ref(txn); 544 545 /* We are the first failed job. Cancel other jobs. */ 546 QLIST_FOREACH(other_job, &txn->jobs, txn_list) { 547 ctx = blk_get_aio_context(other_job->blk); 548 aio_context_acquire(ctx); 549 } 550 551 /* Other jobs are effectively cancelled by us, set the status for 552 * them; this job, however, may or may not be cancelled, depending 553 * on the caller, so leave it. */ 554 QLIST_FOREACH(other_job, &txn->jobs, txn_list) { 555 if (other_job != job) { 556 block_job_cancel_async(other_job, false); 557 } 558 } 559 while (!QLIST_EMPTY(&txn->jobs)) { 560 other_job = QLIST_FIRST(&txn->jobs); 561 ctx = blk_get_aio_context(other_job->blk); 562 if (!other_job->completed) { 563 assert(job_is_cancelled(&other_job->job)); 564 block_job_finish_sync(other_job, NULL, NULL); 565 } 566 block_job_finalize_single(other_job); 567 aio_context_release(ctx); 568 } 569 570 block_job_txn_unref(txn); 571 } 572 573 static int block_job_needs_finalize(BlockJob *job) 574 { 575 return !job->auto_finalize; 576 } 577 578 static void block_job_do_finalize(BlockJob *job) 579 { 580 int rc; 581 assert(job && job->txn); 582 583 /* prepare the transaction to complete */ 584 rc = block_job_txn_apply(job->txn, block_job_prepare, true); 585 if (rc) { 586 block_job_completed_txn_abort(job); 587 } else { 588 block_job_txn_apply(job->txn, block_job_finalize_single, true); 589 } 590 } 591 592 static void block_job_completed_txn_success(BlockJob *job) 593 { 594 BlockJobTxn *txn = job->txn; 595 BlockJob *other_job; 596 597 job_state_transition(&job->job, JOB_STATUS_WAITING); 598 599 /* 600 * Successful completion, see if there are other running jobs in this 601 * txn. 602 */ 603 QLIST_FOREACH(other_job, &txn->jobs, txn_list) { 604 if (!other_job->completed) { 605 return; 606 } 607 assert(other_job->ret == 0); 608 } 609 610 block_job_txn_apply(txn, block_job_event_pending, false); 611 612 /* If no jobs need manual finalization, automatically do so */ 613 if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) { 614 block_job_do_finalize(job); 615 } 616 } 617 618 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp) 619 { 620 int64_t old_speed = job->speed; 621 622 if (job_apply_verb(&job->job, JOB_VERB_SET_SPEED, errp)) { 623 return; 624 } 625 if (speed < 0) { 626 error_setg(errp, QERR_INVALID_PARAMETER, "speed"); 627 return; 628 } 629 630 ratelimit_set_speed(&job->limit, speed, BLOCK_JOB_SLICE_TIME); 631 632 job->speed = speed; 633 if (speed && speed <= old_speed) { 634 return; 635 } 636 637 /* kick only if a timer is pending */ 638 block_job_enter_cond(job, block_job_timer_pending); 639 } 640 641 int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n) 642 { 643 if (!job->speed) { 644 return 0; 645 } 646 647 return ratelimit_calculate_delay(&job->limit, n); 648 } 649 650 void block_job_complete(BlockJob *job, Error **errp) 651 { 652 /* Should not be reachable via external interface for internal jobs */ 653 assert(job->job.id); 654 if (job_apply_verb(&job->job, JOB_VERB_COMPLETE, errp)) { 655 return; 656 } 657 if (job->pause_count || job_is_cancelled(&job->job) || 658 !job->driver->complete) 659 { 660 error_setg(errp, "The active block job '%s' cannot be completed", 661 job->job.id); 662 return; 663 } 664 665 job->driver->complete(job, errp); 666 } 667 668 void block_job_finalize(BlockJob *job, Error **errp) 669 { 670 assert(job && job->job.id); 671 if (job_apply_verb(&job->job, JOB_VERB_FINALIZE, errp)) { 672 return; 673 } 674 block_job_do_finalize(job); 675 } 676 677 void block_job_dismiss(BlockJob **jobptr, Error **errp) 678 { 679 BlockJob *job = *jobptr; 680 /* similarly to _complete, this is QMP-interface only. */ 681 assert(job->job.id); 682 if (job_apply_verb(&job->job, JOB_VERB_DISMISS, errp)) { 683 return; 684 } 685 686 block_job_do_dismiss(job); 687 *jobptr = NULL; 688 } 689 690 void block_job_user_pause(BlockJob *job, Error **errp) 691 { 692 if (job_apply_verb(&job->job, JOB_VERB_PAUSE, errp)) { 693 return; 694 } 695 if (job->user_paused) { 696 error_setg(errp, "Job is already paused"); 697 return; 698 } 699 job->user_paused = true; 700 block_job_pause(job); 701 } 702 703 bool block_job_user_paused(BlockJob *job) 704 { 705 return job->user_paused; 706 } 707 708 void block_job_user_resume(BlockJob *job, Error **errp) 709 { 710 assert(job); 711 if (!job->user_paused || job->pause_count <= 0) { 712 error_setg(errp, "Can't resume a job that was not paused"); 713 return; 714 } 715 if (job_apply_verb(&job->job, JOB_VERB_RESUME, errp)) { 716 return; 717 } 718 block_job_iostatus_reset(job); 719 job->user_paused = false; 720 block_job_resume(job); 721 } 722 723 void block_job_cancel(BlockJob *job, bool force) 724 { 725 if (job->job.status == JOB_STATUS_CONCLUDED) { 726 block_job_do_dismiss(job); 727 return; 728 } 729 block_job_cancel_async(job, force); 730 if (!block_job_started(job)) { 731 block_job_completed(job, -ECANCELED); 732 } else if (job->job.deferred_to_main_loop) { 733 block_job_completed_txn_abort(job); 734 } else { 735 block_job_enter(job); 736 } 737 } 738 739 void block_job_user_cancel(BlockJob *job, bool force, Error **errp) 740 { 741 if (job_apply_verb(&job->job, JOB_VERB_CANCEL, errp)) { 742 return; 743 } 744 block_job_cancel(job, force); 745 } 746 747 /* A wrapper around block_job_cancel() taking an Error ** parameter so it may be 748 * used with block_job_finish_sync() without the need for (rather nasty) 749 * function pointer casts there. */ 750 static void block_job_cancel_err(BlockJob *job, Error **errp) 751 { 752 block_job_cancel(job, false); 753 } 754 755 int block_job_cancel_sync(BlockJob *job) 756 { 757 return block_job_finish_sync(job, &block_job_cancel_err, NULL); 758 } 759 760 void block_job_cancel_sync_all(void) 761 { 762 BlockJob *job; 763 AioContext *aio_context; 764 765 while ((job = block_job_next(NULL))) { 766 aio_context = blk_get_aio_context(job->blk); 767 aio_context_acquire(aio_context); 768 block_job_cancel_sync(job); 769 aio_context_release(aio_context); 770 } 771 } 772 773 int block_job_complete_sync(BlockJob *job, Error **errp) 774 { 775 return block_job_finish_sync(job, &block_job_complete, errp); 776 } 777 778 void block_job_progress_update(BlockJob *job, uint64_t done) 779 { 780 job->offset += done; 781 } 782 783 void block_job_progress_set_remaining(BlockJob *job, uint64_t remaining) 784 { 785 job->len = job->offset + remaining; 786 } 787 788 BlockJobInfo *block_job_query(BlockJob *job, Error **errp) 789 { 790 BlockJobInfo *info; 791 792 if (block_job_is_internal(job)) { 793 error_setg(errp, "Cannot query QEMU internal jobs"); 794 return NULL; 795 } 796 info = g_new0(BlockJobInfo, 1); 797 info->type = g_strdup(job_type_str(&job->job)); 798 info->device = g_strdup(job->job.id); 799 info->len = job->len; 800 info->busy = atomic_read(&job->busy); 801 info->paused = job->pause_count > 0; 802 info->offset = job->offset; 803 info->speed = job->speed; 804 info->io_status = job->iostatus; 805 info->ready = job->ready; 806 info->status = job->job.status; 807 info->auto_finalize = job->auto_finalize; 808 info->auto_dismiss = job->auto_dismiss; 809 info->has_error = job->ret != 0; 810 info->error = job->ret ? g_strdup(strerror(-job->ret)) : NULL; 811 return info; 812 } 813 814 static void block_job_iostatus_set_err(BlockJob *job, int error) 815 { 816 if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) { 817 job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE : 818 BLOCK_DEVICE_IO_STATUS_FAILED; 819 } 820 } 821 822 static void block_job_event_cancelled(BlockJob *job) 823 { 824 if (block_job_is_internal(job)) { 825 return; 826 } 827 828 qapi_event_send_block_job_cancelled(job_type(&job->job), 829 job->job.id, 830 job->len, 831 job->offset, 832 job->speed, 833 &error_abort); 834 } 835 836 static void block_job_event_completed(BlockJob *job, const char *msg) 837 { 838 if (block_job_is_internal(job)) { 839 return; 840 } 841 842 qapi_event_send_block_job_completed(job_type(&job->job), 843 job->job.id, 844 job->len, 845 job->offset, 846 job->speed, 847 !!msg, 848 msg, 849 &error_abort); 850 } 851 852 static int block_job_event_pending(BlockJob *job) 853 { 854 job_state_transition(&job->job, JOB_STATUS_PENDING); 855 if (!job->auto_finalize && !block_job_is_internal(job)) { 856 qapi_event_send_block_job_pending(job_type(&job->job), 857 job->job.id, 858 &error_abort); 859 } 860 return 0; 861 } 862 863 /* 864 * API for block job drivers and the block layer. These functions are 865 * declared in blockjob_int.h. 866 */ 867 868 void *block_job_create(const char *job_id, const BlockJobDriver *driver, 869 BlockJobTxn *txn, BlockDriverState *bs, uint64_t perm, 870 uint64_t shared_perm, int64_t speed, int flags, 871 BlockCompletionFunc *cb, void *opaque, Error **errp) 872 { 873 BlockBackend *blk; 874 BlockJob *job; 875 int ret; 876 877 if (bs->job) { 878 error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs)); 879 return NULL; 880 } 881 882 if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) { 883 job_id = bdrv_get_device_name(bs); 884 if (!*job_id) { 885 error_setg(errp, "An explicit job ID is required for this node"); 886 return NULL; 887 } 888 } 889 890 if (job_id) { 891 if (flags & BLOCK_JOB_INTERNAL) { 892 error_setg(errp, "Cannot specify job ID for internal block job"); 893 return NULL; 894 } 895 } 896 897 blk = blk_new(perm, shared_perm); 898 ret = blk_insert_bs(blk, bs, errp); 899 if (ret < 0) { 900 blk_unref(blk); 901 return NULL; 902 } 903 904 job = job_create(job_id, &driver->job_driver, blk_get_aio_context(blk), 905 errp); 906 if (job == NULL) { 907 blk_unref(blk); 908 return NULL; 909 } 910 911 assert(is_block_job(&job->job)); 912 assert(job->job.driver->free == &block_job_free); 913 914 job->driver = driver; 915 job->blk = blk; 916 job->cb = cb; 917 job->opaque = opaque; 918 job->busy = false; 919 job->paused = true; 920 job->pause_count = 1; 921 job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE); 922 job->auto_dismiss = !(flags & BLOCK_JOB_MANUAL_DISMISS); 923 aio_timer_init(qemu_get_aio_context(), &job->sleep_timer, 924 QEMU_CLOCK_REALTIME, SCALE_NS, 925 block_job_sleep_timer_cb, job); 926 927 error_setg(&job->blocker, "block device is in use by block job: %s", 928 job_type_str(&job->job)); 929 block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort); 930 bs->job = job; 931 932 bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker); 933 934 blk_add_aio_context_notifier(blk, block_job_attached_aio_context, 935 block_job_detach_aio_context, job); 936 937 /* Only set speed when necessary to avoid NotSupported error */ 938 if (speed != 0) { 939 Error *local_err = NULL; 940 941 block_job_set_speed(job, speed, &local_err); 942 if (local_err) { 943 block_job_early_fail(job); 944 error_propagate(errp, local_err); 945 return NULL; 946 } 947 } 948 949 /* Single jobs are modeled as single-job transactions for sake of 950 * consolidating the job management logic */ 951 if (!txn) { 952 txn = block_job_txn_new(); 953 block_job_txn_add_job(txn, job); 954 block_job_txn_unref(txn); 955 } else { 956 block_job_txn_add_job(txn, job); 957 } 958 959 return job; 960 } 961 962 void block_job_early_fail(BlockJob *job) 963 { 964 assert(job->job.status == JOB_STATUS_CREATED); 965 block_job_decommission(job); 966 } 967 968 void block_job_completed(BlockJob *job, int ret) 969 { 970 assert(job && job->txn && !job->completed); 971 assert(blk_bs(job->blk)->job == job); 972 job->completed = true; 973 job->ret = ret; 974 block_job_update_rc(job); 975 trace_block_job_completed(job, ret, job->ret); 976 if (job->ret) { 977 block_job_completed_txn_abort(job); 978 } else { 979 block_job_completed_txn_success(job); 980 } 981 } 982 983 static bool block_job_should_pause(BlockJob *job) 984 { 985 return job->pause_count > 0; 986 } 987 988 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds. 989 * Reentering the job coroutine with block_job_enter() before the timer has 990 * expired is allowed and cancels the timer. 991 * 992 * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be 993 * called explicitly. */ 994 static void block_job_do_yield(BlockJob *job, uint64_t ns) 995 { 996 block_job_lock(); 997 if (ns != -1) { 998 timer_mod(&job->sleep_timer, ns); 999 } 1000 job->busy = false; 1001 block_job_unlock(); 1002 qemu_coroutine_yield(); 1003 1004 /* Set by block_job_enter before re-entering the coroutine. */ 1005 assert(job->busy); 1006 } 1007 1008 void coroutine_fn block_job_pause_point(BlockJob *job) 1009 { 1010 assert(job && block_job_started(job)); 1011 1012 if (!block_job_should_pause(job)) { 1013 return; 1014 } 1015 if (job_is_cancelled(&job->job)) { 1016 return; 1017 } 1018 1019 if (job->driver->pause) { 1020 job->driver->pause(job); 1021 } 1022 1023 if (block_job_should_pause(job) && !job_is_cancelled(&job->job)) { 1024 JobStatus status = job->job.status; 1025 job_state_transition(&job->job, status == JOB_STATUS_READY 1026 ? JOB_STATUS_STANDBY 1027 : JOB_STATUS_PAUSED); 1028 job->paused = true; 1029 block_job_do_yield(job, -1); 1030 job->paused = false; 1031 job_state_transition(&job->job, status); 1032 } 1033 1034 if (job->driver->resume) { 1035 job->driver->resume(job); 1036 } 1037 } 1038 1039 /* 1040 * Conditionally enter a block_job pending a call to fn() while 1041 * under the block_job_lock critical section. 1042 */ 1043 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job)) 1044 { 1045 if (!block_job_started(job)) { 1046 return; 1047 } 1048 if (job->job.deferred_to_main_loop) { 1049 return; 1050 } 1051 1052 block_job_lock(); 1053 if (job->busy) { 1054 block_job_unlock(); 1055 return; 1056 } 1057 1058 if (fn && !fn(job)) { 1059 block_job_unlock(); 1060 return; 1061 } 1062 1063 assert(!job->job.deferred_to_main_loop); 1064 timer_del(&job->sleep_timer); 1065 job->busy = true; 1066 block_job_unlock(); 1067 aio_co_wake(job->co); 1068 } 1069 1070 void block_job_enter(BlockJob *job) 1071 { 1072 block_job_enter_cond(job, NULL); 1073 } 1074 1075 void block_job_sleep_ns(BlockJob *job, int64_t ns) 1076 { 1077 assert(job->busy); 1078 1079 /* Check cancellation *before* setting busy = false, too! */ 1080 if (job_is_cancelled(&job->job)) { 1081 return; 1082 } 1083 1084 if (!block_job_should_pause(job)) { 1085 block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns); 1086 } 1087 1088 block_job_pause_point(job); 1089 } 1090 1091 void block_job_yield(BlockJob *job) 1092 { 1093 assert(job->busy); 1094 1095 /* Check cancellation *before* setting busy = false, too! */ 1096 if (job_is_cancelled(&job->job)) { 1097 return; 1098 } 1099 1100 if (!block_job_should_pause(job)) { 1101 block_job_do_yield(job, -1); 1102 } 1103 1104 block_job_pause_point(job); 1105 } 1106 1107 void block_job_iostatus_reset(BlockJob *job) 1108 { 1109 if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) { 1110 return; 1111 } 1112 assert(job->user_paused && job->pause_count > 0); 1113 job->iostatus = BLOCK_DEVICE_IO_STATUS_OK; 1114 } 1115 1116 void block_job_event_ready(BlockJob *job) 1117 { 1118 job_state_transition(&job->job, JOB_STATUS_READY); 1119 job->ready = true; 1120 1121 if (block_job_is_internal(job)) { 1122 return; 1123 } 1124 1125 qapi_event_send_block_job_ready(job_type(&job->job), 1126 job->job.id, 1127 job->len, 1128 job->offset, 1129 job->speed, &error_abort); 1130 } 1131 1132 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err, 1133 int is_read, int error) 1134 { 1135 BlockErrorAction action; 1136 1137 switch (on_err) { 1138 case BLOCKDEV_ON_ERROR_ENOSPC: 1139 case BLOCKDEV_ON_ERROR_AUTO: 1140 action = (error == ENOSPC) ? 1141 BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT; 1142 break; 1143 case BLOCKDEV_ON_ERROR_STOP: 1144 action = BLOCK_ERROR_ACTION_STOP; 1145 break; 1146 case BLOCKDEV_ON_ERROR_REPORT: 1147 action = BLOCK_ERROR_ACTION_REPORT; 1148 break; 1149 case BLOCKDEV_ON_ERROR_IGNORE: 1150 action = BLOCK_ERROR_ACTION_IGNORE; 1151 break; 1152 default: 1153 abort(); 1154 } 1155 if (!block_job_is_internal(job)) { 1156 qapi_event_send_block_job_error(job->job.id, 1157 is_read ? IO_OPERATION_TYPE_READ : 1158 IO_OPERATION_TYPE_WRITE, 1159 action, &error_abort); 1160 } 1161 if (action == BLOCK_ERROR_ACTION_STOP) { 1162 block_job_pause(job); 1163 /* make the pause user visible, which will be resumed from QMP. */ 1164 job->user_paused = true; 1165 block_job_iostatus_set_err(job, error); 1166 } 1167 return action; 1168 } 1169