xref: /qemu/blockjob.c (revision 1908a559)
1 /*
2  * QEMU System Emulator block driver
3  *
4  * Copyright (c) 2011 IBM Corp.
5  * Copyright (c) 2012 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 #include "qemu-common.h"
28 #include "block/block.h"
29 #include "block/blockjob_int.h"
30 #include "block/block_int.h"
31 #include "block/trace.h"
32 #include "sysemu/block-backend.h"
33 #include "qapi/error.h"
34 #include "qapi/qapi-events-block-core.h"
35 #include "qapi/qmp/qerror.h"
36 #include "qemu/coroutine.h"
37 #include "qemu/timer.h"
38 
39 /* Right now, this mutex is only needed to synchronize accesses to job->busy
40  * and job->sleep_timer, such as concurrent calls to block_job_do_yield and
41  * block_job_enter. */
42 static QemuMutex block_job_mutex;
43 
44 static void block_job_lock(void)
45 {
46     qemu_mutex_lock(&block_job_mutex);
47 }
48 
49 static void block_job_unlock(void)
50 {
51     qemu_mutex_unlock(&block_job_mutex);
52 }
53 
54 static void __attribute__((__constructor__)) block_job_init(void)
55 {
56     qemu_mutex_init(&block_job_mutex);
57 }
58 
59 static void block_job_event_cancelled(BlockJob *job);
60 static void block_job_event_completed(BlockJob *job, const char *msg);
61 static int block_job_event_pending(BlockJob *job);
62 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job));
63 
64 /* Transactional group of block jobs */
65 struct BlockJobTxn {
66 
67     /* Is this txn being cancelled? */
68     bool aborting;
69 
70     /* List of jobs */
71     QLIST_HEAD(, BlockJob) jobs;
72 
73     /* Reference count */
74     int refcnt;
75 };
76 
77 /*
78  * The block job API is composed of two categories of functions.
79  *
80  * The first includes functions used by the monitor.  The monitor is
81  * peculiar in that it accesses the block job list with block_job_get, and
82  * therefore needs consistency across block_job_get and the actual operation
83  * (e.g. block_job_set_speed).  The consistency is achieved with
84  * aio_context_acquire/release.  These functions are declared in blockjob.h.
85  *
86  * The second includes functions used by the block job drivers and sometimes
87  * by the core block layer.  These do not care about locking, because the
88  * whole coroutine runs under the AioContext lock, and are declared in
89  * blockjob_int.h.
90  */
91 
92 static bool is_block_job(Job *job)
93 {
94     return job_type(job) == JOB_TYPE_BACKUP ||
95            job_type(job) == JOB_TYPE_COMMIT ||
96            job_type(job) == JOB_TYPE_MIRROR ||
97            job_type(job) == JOB_TYPE_STREAM;
98 }
99 
100 BlockJob *block_job_next(BlockJob *bjob)
101 {
102     Job *job = bjob ? &bjob->job : NULL;
103 
104     do {
105         job = job_next(job);
106     } while (job && !is_block_job(job));
107 
108     return job ? container_of(job, BlockJob, job) : NULL;
109 }
110 
111 BlockJob *block_job_get(const char *id)
112 {
113     Job *job = job_get(id);
114 
115     if (job && is_block_job(job)) {
116         return container_of(job, BlockJob, job);
117     } else {
118         return NULL;
119     }
120 }
121 
122 BlockJobTxn *block_job_txn_new(void)
123 {
124     BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
125     QLIST_INIT(&txn->jobs);
126     txn->refcnt = 1;
127     return txn;
128 }
129 
130 static void block_job_txn_ref(BlockJobTxn *txn)
131 {
132     txn->refcnt++;
133 }
134 
135 void block_job_txn_unref(BlockJobTxn *txn)
136 {
137     if (txn && --txn->refcnt == 0) {
138         g_free(txn);
139     }
140 }
141 
142 void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
143 {
144     if (!txn) {
145         return;
146     }
147 
148     assert(!job->txn);
149     job->txn = txn;
150 
151     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
152     block_job_txn_ref(txn);
153 }
154 
155 static void block_job_txn_del_job(BlockJob *job)
156 {
157     if (job->txn) {
158         QLIST_REMOVE(job, txn_list);
159         block_job_txn_unref(job->txn);
160         job->txn = NULL;
161     }
162 }
163 
164 /* Assumes the block_job_mutex is held */
165 static bool block_job_timer_pending(BlockJob *job)
166 {
167     return timer_pending(&job->sleep_timer);
168 }
169 
170 /* Assumes the block_job_mutex is held */
171 static bool block_job_timer_not_pending(BlockJob *job)
172 {
173     return !block_job_timer_pending(job);
174 }
175 
176 static void block_job_pause(BlockJob *job)
177 {
178     job->pause_count++;
179 }
180 
181 static void block_job_resume(BlockJob *job)
182 {
183     assert(job->pause_count > 0);
184     job->pause_count--;
185     if (job->pause_count) {
186         return;
187     }
188 
189     /* kick only if no timer is pending */
190     block_job_enter_cond(job, block_job_timer_not_pending);
191 }
192 
193 static void block_job_attached_aio_context(AioContext *new_context,
194                                            void *opaque);
195 static void block_job_detach_aio_context(void *opaque);
196 
197 void block_job_free(Job *job)
198 {
199     BlockJob *bjob = container_of(job, BlockJob, job);
200     BlockDriverState *bs = blk_bs(bjob->blk);
201 
202     assert(!bjob->txn);
203 
204     bs->job = NULL;
205     block_job_remove_all_bdrv(bjob);
206     blk_remove_aio_context_notifier(bjob->blk,
207                                     block_job_attached_aio_context,
208                                     block_job_detach_aio_context, bjob);
209     blk_unref(bjob->blk);
210     error_free(bjob->blocker);
211     assert(!timer_pending(&bjob->sleep_timer));
212 }
213 
214 static void block_job_attached_aio_context(AioContext *new_context,
215                                            void *opaque)
216 {
217     BlockJob *job = opaque;
218 
219     job->job.aio_context = new_context;
220     if (job->driver->attached_aio_context) {
221         job->driver->attached_aio_context(job, new_context);
222     }
223 
224     block_job_resume(job);
225 }
226 
227 static void block_job_drain(BlockJob *job)
228 {
229     /* If job is !job->busy this kicks it into the next pause point. */
230     block_job_enter(job);
231 
232     blk_drain(job->blk);
233     if (job->driver->drain) {
234         job->driver->drain(job);
235     }
236 }
237 
238 static void block_job_detach_aio_context(void *opaque)
239 {
240     BlockJob *job = opaque;
241 
242     /* In case the job terminates during aio_poll()... */
243     job_ref(&job->job);
244 
245     block_job_pause(job);
246 
247     while (!job->paused && !job->completed) {
248         block_job_drain(job);
249     }
250 
251     job->job.aio_context = NULL;
252     job_unref(&job->job);
253 }
254 
255 static char *child_job_get_parent_desc(BdrvChild *c)
256 {
257     BlockJob *job = c->opaque;
258     return g_strdup_printf("%s job '%s'", job_type_str(&job->job), job->job.id);
259 }
260 
261 static void child_job_drained_begin(BdrvChild *c)
262 {
263     BlockJob *job = c->opaque;
264     block_job_pause(job);
265 }
266 
267 static void child_job_drained_end(BdrvChild *c)
268 {
269     BlockJob *job = c->opaque;
270     block_job_resume(job);
271 }
272 
273 static const BdrvChildRole child_job = {
274     .get_parent_desc    = child_job_get_parent_desc,
275     .drained_begin      = child_job_drained_begin,
276     .drained_end        = child_job_drained_end,
277     .stay_at_node       = true,
278 };
279 
280 void block_job_remove_all_bdrv(BlockJob *job)
281 {
282     GSList *l;
283     for (l = job->nodes; l; l = l->next) {
284         BdrvChild *c = l->data;
285         bdrv_op_unblock_all(c->bs, job->blocker);
286         bdrv_root_unref_child(c);
287     }
288     g_slist_free(job->nodes);
289     job->nodes = NULL;
290 }
291 
292 int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
293                        uint64_t perm, uint64_t shared_perm, Error **errp)
294 {
295     BdrvChild *c;
296 
297     c = bdrv_root_attach_child(bs, name, &child_job, perm, shared_perm,
298                                job, errp);
299     if (c == NULL) {
300         return -EPERM;
301     }
302 
303     job->nodes = g_slist_prepend(job->nodes, c);
304     bdrv_ref(bs);
305     bdrv_op_block_all(bs, job->blocker);
306 
307     return 0;
308 }
309 
310 bool block_job_is_internal(BlockJob *job)
311 {
312     return (job->job.id == NULL);
313 }
314 
315 static bool block_job_started(BlockJob *job)
316 {
317     return job->co;
318 }
319 
320 const BlockJobDriver *block_job_driver(BlockJob *job)
321 {
322     return job->driver;
323 }
324 
325 /**
326  * All jobs must allow a pause point before entering their job proper. This
327  * ensures that jobs can be paused prior to being started, then resumed later.
328  */
329 static void coroutine_fn block_job_co_entry(void *opaque)
330 {
331     BlockJob *job = opaque;
332 
333     assert(job && job->driver && job->driver->start);
334     block_job_pause_point(job);
335     job->driver->start(job);
336 }
337 
338 static void block_job_sleep_timer_cb(void *opaque)
339 {
340     BlockJob *job = opaque;
341 
342     block_job_enter(job);
343 }
344 
345 void block_job_start(BlockJob *job)
346 {
347     assert(job && !block_job_started(job) && job->paused &&
348            job->driver && job->driver->start);
349     job->co = qemu_coroutine_create(block_job_co_entry, job);
350     job->pause_count--;
351     job->busy = true;
352     job->paused = false;
353     job_state_transition(&job->job, JOB_STATUS_RUNNING);
354     bdrv_coroutine_enter(blk_bs(job->blk), job->co);
355 }
356 
357 static void block_job_decommission(BlockJob *job)
358 {
359     assert(job);
360     job->completed = true;
361     job->busy = false;
362     job->paused = false;
363     job->job.deferred_to_main_loop = true;
364     block_job_txn_del_job(job);
365     job_state_transition(&job->job, JOB_STATUS_NULL);
366     job_unref(&job->job);
367 }
368 
369 static void block_job_do_dismiss(BlockJob *job)
370 {
371     block_job_decommission(job);
372 }
373 
374 static void block_job_conclude(BlockJob *job)
375 {
376     job_state_transition(&job->job, JOB_STATUS_CONCLUDED);
377     if (job->auto_dismiss || !block_job_started(job)) {
378         block_job_do_dismiss(job);
379     }
380 }
381 
382 static void block_job_update_rc(BlockJob *job)
383 {
384     if (!job->ret && job_is_cancelled(&job->job)) {
385         job->ret = -ECANCELED;
386     }
387     if (job->ret) {
388         job_state_transition(&job->job, JOB_STATUS_ABORTING);
389     }
390 }
391 
392 static int block_job_prepare(BlockJob *job)
393 {
394     if (job->ret == 0 && job->driver->prepare) {
395         job->ret = job->driver->prepare(job);
396     }
397     return job->ret;
398 }
399 
400 static void block_job_commit(BlockJob *job)
401 {
402     assert(!job->ret);
403     if (job->driver->commit) {
404         job->driver->commit(job);
405     }
406 }
407 
408 static void block_job_abort(BlockJob *job)
409 {
410     assert(job->ret);
411     if (job->driver->abort) {
412         job->driver->abort(job);
413     }
414 }
415 
416 static void block_job_clean(BlockJob *job)
417 {
418     if (job->driver->clean) {
419         job->driver->clean(job);
420     }
421 }
422 
423 static int block_job_finalize_single(BlockJob *job)
424 {
425     assert(job->completed);
426 
427     /* Ensure abort is called for late-transactional failures */
428     block_job_update_rc(job);
429 
430     if (!job->ret) {
431         block_job_commit(job);
432     } else {
433         block_job_abort(job);
434     }
435     block_job_clean(job);
436 
437     if (job->cb) {
438         job->cb(job->opaque, job->ret);
439     }
440 
441     /* Emit events only if we actually started */
442     if (block_job_started(job)) {
443         if (job_is_cancelled(&job->job)) {
444             block_job_event_cancelled(job);
445         } else {
446             const char *msg = NULL;
447             if (job->ret < 0) {
448                 msg = strerror(-job->ret);
449             }
450             block_job_event_completed(job, msg);
451         }
452     }
453 
454     block_job_txn_del_job(job);
455     block_job_conclude(job);
456     return 0;
457 }
458 
459 static void block_job_cancel_async(BlockJob *job, bool force)
460 {
461     if (job->iostatus != BLOCK_DEVICE_IO_STATUS_OK) {
462         block_job_iostatus_reset(job);
463     }
464     if (job->user_paused) {
465         /* Do not call block_job_enter here, the caller will handle it.  */
466         job->user_paused = false;
467         job->pause_count--;
468     }
469     job->job.cancelled = true;
470     /* To prevent 'force == false' overriding a previous 'force == true' */
471     job->force |= force;
472 }
473 
474 static int block_job_txn_apply(BlockJobTxn *txn, int fn(BlockJob *), bool lock)
475 {
476     AioContext *ctx;
477     BlockJob *job, *next;
478     int rc = 0;
479 
480     QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
481         if (lock) {
482             ctx = blk_get_aio_context(job->blk);
483             aio_context_acquire(ctx);
484         }
485         rc = fn(job);
486         if (lock) {
487             aio_context_release(ctx);
488         }
489         if (rc) {
490             break;
491         }
492     }
493     return rc;
494 }
495 
496 static int block_job_finish_sync(BlockJob *job,
497                                  void (*finish)(BlockJob *, Error **errp),
498                                  Error **errp)
499 {
500     Error *local_err = NULL;
501     int ret;
502 
503     assert(blk_bs(job->blk)->job == job);
504 
505     job_ref(&job->job);
506 
507     if (finish) {
508         finish(job, &local_err);
509     }
510     if (local_err) {
511         error_propagate(errp, local_err);
512         job_unref(&job->job);
513         return -EBUSY;
514     }
515     /* block_job_drain calls block_job_enter, and it should be enough to
516      * induce progress until the job completes or moves to the main thread.
517     */
518     while (!job->job.deferred_to_main_loop && !job->completed) {
519         block_job_drain(job);
520     }
521     while (!job->completed) {
522         aio_poll(qemu_get_aio_context(), true);
523     }
524     ret = (job_is_cancelled(&job->job) && job->ret == 0)
525           ? -ECANCELED : job->ret;
526     job_unref(&job->job);
527     return ret;
528 }
529 
530 static void block_job_completed_txn_abort(BlockJob *job)
531 {
532     AioContext *ctx;
533     BlockJobTxn *txn = job->txn;
534     BlockJob *other_job;
535 
536     if (txn->aborting) {
537         /*
538          * We are cancelled by another job, which will handle everything.
539          */
540         return;
541     }
542     txn->aborting = true;
543     block_job_txn_ref(txn);
544 
545     /* We are the first failed job. Cancel other jobs. */
546     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
547         ctx = blk_get_aio_context(other_job->blk);
548         aio_context_acquire(ctx);
549     }
550 
551     /* Other jobs are effectively cancelled by us, set the status for
552      * them; this job, however, may or may not be cancelled, depending
553      * on the caller, so leave it. */
554     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
555         if (other_job != job) {
556             block_job_cancel_async(other_job, false);
557         }
558     }
559     while (!QLIST_EMPTY(&txn->jobs)) {
560         other_job = QLIST_FIRST(&txn->jobs);
561         ctx = blk_get_aio_context(other_job->blk);
562         if (!other_job->completed) {
563             assert(job_is_cancelled(&other_job->job));
564             block_job_finish_sync(other_job, NULL, NULL);
565         }
566         block_job_finalize_single(other_job);
567         aio_context_release(ctx);
568     }
569 
570     block_job_txn_unref(txn);
571 }
572 
573 static int block_job_needs_finalize(BlockJob *job)
574 {
575     return !job->auto_finalize;
576 }
577 
578 static void block_job_do_finalize(BlockJob *job)
579 {
580     int rc;
581     assert(job && job->txn);
582 
583     /* prepare the transaction to complete */
584     rc = block_job_txn_apply(job->txn, block_job_prepare, true);
585     if (rc) {
586         block_job_completed_txn_abort(job);
587     } else {
588         block_job_txn_apply(job->txn, block_job_finalize_single, true);
589     }
590 }
591 
592 static void block_job_completed_txn_success(BlockJob *job)
593 {
594     BlockJobTxn *txn = job->txn;
595     BlockJob *other_job;
596 
597     job_state_transition(&job->job, JOB_STATUS_WAITING);
598 
599     /*
600      * Successful completion, see if there are other running jobs in this
601      * txn.
602      */
603     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
604         if (!other_job->completed) {
605             return;
606         }
607         assert(other_job->ret == 0);
608     }
609 
610     block_job_txn_apply(txn, block_job_event_pending, false);
611 
612     /* If no jobs need manual finalization, automatically do so */
613     if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) {
614         block_job_do_finalize(job);
615     }
616 }
617 
618 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
619 {
620     int64_t old_speed = job->speed;
621 
622     if (job_apply_verb(&job->job, JOB_VERB_SET_SPEED, errp)) {
623         return;
624     }
625     if (speed < 0) {
626         error_setg(errp, QERR_INVALID_PARAMETER, "speed");
627         return;
628     }
629 
630     ratelimit_set_speed(&job->limit, speed, BLOCK_JOB_SLICE_TIME);
631 
632     job->speed = speed;
633     if (speed && speed <= old_speed) {
634         return;
635     }
636 
637     /* kick only if a timer is pending */
638     block_job_enter_cond(job, block_job_timer_pending);
639 }
640 
641 int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
642 {
643     if (!job->speed) {
644         return 0;
645     }
646 
647     return ratelimit_calculate_delay(&job->limit, n);
648 }
649 
650 void block_job_complete(BlockJob *job, Error **errp)
651 {
652     /* Should not be reachable via external interface for internal jobs */
653     assert(job->job.id);
654     if (job_apply_verb(&job->job, JOB_VERB_COMPLETE, errp)) {
655         return;
656     }
657     if (job->pause_count || job_is_cancelled(&job->job) ||
658         !job->driver->complete)
659     {
660         error_setg(errp, "The active block job '%s' cannot be completed",
661                    job->job.id);
662         return;
663     }
664 
665     job->driver->complete(job, errp);
666 }
667 
668 void block_job_finalize(BlockJob *job, Error **errp)
669 {
670     assert(job && job->job.id);
671     if (job_apply_verb(&job->job, JOB_VERB_FINALIZE, errp)) {
672         return;
673     }
674     block_job_do_finalize(job);
675 }
676 
677 void block_job_dismiss(BlockJob **jobptr, Error **errp)
678 {
679     BlockJob *job = *jobptr;
680     /* similarly to _complete, this is QMP-interface only. */
681     assert(job->job.id);
682     if (job_apply_verb(&job->job, JOB_VERB_DISMISS, errp)) {
683         return;
684     }
685 
686     block_job_do_dismiss(job);
687     *jobptr = NULL;
688 }
689 
690 void block_job_user_pause(BlockJob *job, Error **errp)
691 {
692     if (job_apply_verb(&job->job, JOB_VERB_PAUSE, errp)) {
693         return;
694     }
695     if (job->user_paused) {
696         error_setg(errp, "Job is already paused");
697         return;
698     }
699     job->user_paused = true;
700     block_job_pause(job);
701 }
702 
703 bool block_job_user_paused(BlockJob *job)
704 {
705     return job->user_paused;
706 }
707 
708 void block_job_user_resume(BlockJob *job, Error **errp)
709 {
710     assert(job);
711     if (!job->user_paused || job->pause_count <= 0) {
712         error_setg(errp, "Can't resume a job that was not paused");
713         return;
714     }
715     if (job_apply_verb(&job->job, JOB_VERB_RESUME, errp)) {
716         return;
717     }
718     block_job_iostatus_reset(job);
719     job->user_paused = false;
720     block_job_resume(job);
721 }
722 
723 void block_job_cancel(BlockJob *job, bool force)
724 {
725     if (job->job.status == JOB_STATUS_CONCLUDED) {
726         block_job_do_dismiss(job);
727         return;
728     }
729     block_job_cancel_async(job, force);
730     if (!block_job_started(job)) {
731         block_job_completed(job, -ECANCELED);
732     } else if (job->job.deferred_to_main_loop) {
733         block_job_completed_txn_abort(job);
734     } else {
735         block_job_enter(job);
736     }
737 }
738 
739 void block_job_user_cancel(BlockJob *job, bool force, Error **errp)
740 {
741     if (job_apply_verb(&job->job, JOB_VERB_CANCEL, errp)) {
742         return;
743     }
744     block_job_cancel(job, force);
745 }
746 
747 /* A wrapper around block_job_cancel() taking an Error ** parameter so it may be
748  * used with block_job_finish_sync() without the need for (rather nasty)
749  * function pointer casts there. */
750 static void block_job_cancel_err(BlockJob *job, Error **errp)
751 {
752     block_job_cancel(job, false);
753 }
754 
755 int block_job_cancel_sync(BlockJob *job)
756 {
757     return block_job_finish_sync(job, &block_job_cancel_err, NULL);
758 }
759 
760 void block_job_cancel_sync_all(void)
761 {
762     BlockJob *job;
763     AioContext *aio_context;
764 
765     while ((job = block_job_next(NULL))) {
766         aio_context = blk_get_aio_context(job->blk);
767         aio_context_acquire(aio_context);
768         block_job_cancel_sync(job);
769         aio_context_release(aio_context);
770     }
771 }
772 
773 int block_job_complete_sync(BlockJob *job, Error **errp)
774 {
775     return block_job_finish_sync(job, &block_job_complete, errp);
776 }
777 
778 void block_job_progress_update(BlockJob *job, uint64_t done)
779 {
780     job->offset += done;
781 }
782 
783 void block_job_progress_set_remaining(BlockJob *job, uint64_t remaining)
784 {
785     job->len = job->offset + remaining;
786 }
787 
788 BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
789 {
790     BlockJobInfo *info;
791 
792     if (block_job_is_internal(job)) {
793         error_setg(errp, "Cannot query QEMU internal jobs");
794         return NULL;
795     }
796     info = g_new0(BlockJobInfo, 1);
797     info->type      = g_strdup(job_type_str(&job->job));
798     info->device    = g_strdup(job->job.id);
799     info->len       = job->len;
800     info->busy      = atomic_read(&job->busy);
801     info->paused    = job->pause_count > 0;
802     info->offset    = job->offset;
803     info->speed     = job->speed;
804     info->io_status = job->iostatus;
805     info->ready     = job->ready;
806     info->status    = job->job.status;
807     info->auto_finalize = job->auto_finalize;
808     info->auto_dismiss  = job->auto_dismiss;
809     info->has_error = job->ret != 0;
810     info->error     = job->ret ? g_strdup(strerror(-job->ret)) : NULL;
811     return info;
812 }
813 
814 static void block_job_iostatus_set_err(BlockJob *job, int error)
815 {
816     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
817         job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
818                                           BLOCK_DEVICE_IO_STATUS_FAILED;
819     }
820 }
821 
822 static void block_job_event_cancelled(BlockJob *job)
823 {
824     if (block_job_is_internal(job)) {
825         return;
826     }
827 
828     qapi_event_send_block_job_cancelled(job_type(&job->job),
829                                         job->job.id,
830                                         job->len,
831                                         job->offset,
832                                         job->speed,
833                                         &error_abort);
834 }
835 
836 static void block_job_event_completed(BlockJob *job, const char *msg)
837 {
838     if (block_job_is_internal(job)) {
839         return;
840     }
841 
842     qapi_event_send_block_job_completed(job_type(&job->job),
843                                         job->job.id,
844                                         job->len,
845                                         job->offset,
846                                         job->speed,
847                                         !!msg,
848                                         msg,
849                                         &error_abort);
850 }
851 
852 static int block_job_event_pending(BlockJob *job)
853 {
854     job_state_transition(&job->job, JOB_STATUS_PENDING);
855     if (!job->auto_finalize && !block_job_is_internal(job)) {
856         qapi_event_send_block_job_pending(job_type(&job->job),
857                                           job->job.id,
858                                           &error_abort);
859     }
860     return 0;
861 }
862 
863 /*
864  * API for block job drivers and the block layer.  These functions are
865  * declared in blockjob_int.h.
866  */
867 
868 void *block_job_create(const char *job_id, const BlockJobDriver *driver,
869                        BlockJobTxn *txn, BlockDriverState *bs, uint64_t perm,
870                        uint64_t shared_perm, int64_t speed, int flags,
871                        BlockCompletionFunc *cb, void *opaque, Error **errp)
872 {
873     BlockBackend *blk;
874     BlockJob *job;
875     int ret;
876 
877     if (bs->job) {
878         error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs));
879         return NULL;
880     }
881 
882     if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) {
883         job_id = bdrv_get_device_name(bs);
884         if (!*job_id) {
885             error_setg(errp, "An explicit job ID is required for this node");
886             return NULL;
887         }
888     }
889 
890     if (job_id) {
891         if (flags & BLOCK_JOB_INTERNAL) {
892             error_setg(errp, "Cannot specify job ID for internal block job");
893             return NULL;
894         }
895     }
896 
897     blk = blk_new(perm, shared_perm);
898     ret = blk_insert_bs(blk, bs, errp);
899     if (ret < 0) {
900         blk_unref(blk);
901         return NULL;
902     }
903 
904     job = job_create(job_id, &driver->job_driver, blk_get_aio_context(blk),
905                      errp);
906     if (job == NULL) {
907         blk_unref(blk);
908         return NULL;
909     }
910 
911     assert(is_block_job(&job->job));
912     assert(job->job.driver->free == &block_job_free);
913 
914     job->driver        = driver;
915     job->blk           = blk;
916     job->cb            = cb;
917     job->opaque        = opaque;
918     job->busy          = false;
919     job->paused        = true;
920     job->pause_count   = 1;
921     job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE);
922     job->auto_dismiss  = !(flags & BLOCK_JOB_MANUAL_DISMISS);
923     aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
924                    QEMU_CLOCK_REALTIME, SCALE_NS,
925                    block_job_sleep_timer_cb, job);
926 
927     error_setg(&job->blocker, "block device is in use by block job: %s",
928                job_type_str(&job->job));
929     block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort);
930     bs->job = job;
931 
932     bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
933 
934     blk_add_aio_context_notifier(blk, block_job_attached_aio_context,
935                                  block_job_detach_aio_context, job);
936 
937     /* Only set speed when necessary to avoid NotSupported error */
938     if (speed != 0) {
939         Error *local_err = NULL;
940 
941         block_job_set_speed(job, speed, &local_err);
942         if (local_err) {
943             block_job_early_fail(job);
944             error_propagate(errp, local_err);
945             return NULL;
946         }
947     }
948 
949     /* Single jobs are modeled as single-job transactions for sake of
950      * consolidating the job management logic */
951     if (!txn) {
952         txn = block_job_txn_new();
953         block_job_txn_add_job(txn, job);
954         block_job_txn_unref(txn);
955     } else {
956         block_job_txn_add_job(txn, job);
957     }
958 
959     return job;
960 }
961 
962 void block_job_early_fail(BlockJob *job)
963 {
964     assert(job->job.status == JOB_STATUS_CREATED);
965     block_job_decommission(job);
966 }
967 
968 void block_job_completed(BlockJob *job, int ret)
969 {
970     assert(job && job->txn && !job->completed);
971     assert(blk_bs(job->blk)->job == job);
972     job->completed = true;
973     job->ret = ret;
974     block_job_update_rc(job);
975     trace_block_job_completed(job, ret, job->ret);
976     if (job->ret) {
977         block_job_completed_txn_abort(job);
978     } else {
979         block_job_completed_txn_success(job);
980     }
981 }
982 
983 static bool block_job_should_pause(BlockJob *job)
984 {
985     return job->pause_count > 0;
986 }
987 
988 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
989  * Reentering the job coroutine with block_job_enter() before the timer has
990  * expired is allowed and cancels the timer.
991  *
992  * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
993  * called explicitly. */
994 static void block_job_do_yield(BlockJob *job, uint64_t ns)
995 {
996     block_job_lock();
997     if (ns != -1) {
998         timer_mod(&job->sleep_timer, ns);
999     }
1000     job->busy = false;
1001     block_job_unlock();
1002     qemu_coroutine_yield();
1003 
1004     /* Set by block_job_enter before re-entering the coroutine.  */
1005     assert(job->busy);
1006 }
1007 
1008 void coroutine_fn block_job_pause_point(BlockJob *job)
1009 {
1010     assert(job && block_job_started(job));
1011 
1012     if (!block_job_should_pause(job)) {
1013         return;
1014     }
1015     if (job_is_cancelled(&job->job)) {
1016         return;
1017     }
1018 
1019     if (job->driver->pause) {
1020         job->driver->pause(job);
1021     }
1022 
1023     if (block_job_should_pause(job) && !job_is_cancelled(&job->job)) {
1024         JobStatus status = job->job.status;
1025         job_state_transition(&job->job, status == JOB_STATUS_READY
1026                                         ? JOB_STATUS_STANDBY
1027                                         : JOB_STATUS_PAUSED);
1028         job->paused = true;
1029         block_job_do_yield(job, -1);
1030         job->paused = false;
1031         job_state_transition(&job->job, status);
1032     }
1033 
1034     if (job->driver->resume) {
1035         job->driver->resume(job);
1036     }
1037 }
1038 
1039 /*
1040  * Conditionally enter a block_job pending a call to fn() while
1041  * under the block_job_lock critical section.
1042  */
1043 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job))
1044 {
1045     if (!block_job_started(job)) {
1046         return;
1047     }
1048     if (job->job.deferred_to_main_loop) {
1049         return;
1050     }
1051 
1052     block_job_lock();
1053     if (job->busy) {
1054         block_job_unlock();
1055         return;
1056     }
1057 
1058     if (fn && !fn(job)) {
1059         block_job_unlock();
1060         return;
1061     }
1062 
1063     assert(!job->job.deferred_to_main_loop);
1064     timer_del(&job->sleep_timer);
1065     job->busy = true;
1066     block_job_unlock();
1067     aio_co_wake(job->co);
1068 }
1069 
1070 void block_job_enter(BlockJob *job)
1071 {
1072     block_job_enter_cond(job, NULL);
1073 }
1074 
1075 void block_job_sleep_ns(BlockJob *job, int64_t ns)
1076 {
1077     assert(job->busy);
1078 
1079     /* Check cancellation *before* setting busy = false, too!  */
1080     if (job_is_cancelled(&job->job)) {
1081         return;
1082     }
1083 
1084     if (!block_job_should_pause(job)) {
1085         block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
1086     }
1087 
1088     block_job_pause_point(job);
1089 }
1090 
1091 void block_job_yield(BlockJob *job)
1092 {
1093     assert(job->busy);
1094 
1095     /* Check cancellation *before* setting busy = false, too!  */
1096     if (job_is_cancelled(&job->job)) {
1097         return;
1098     }
1099 
1100     if (!block_job_should_pause(job)) {
1101         block_job_do_yield(job, -1);
1102     }
1103 
1104     block_job_pause_point(job);
1105 }
1106 
1107 void block_job_iostatus_reset(BlockJob *job)
1108 {
1109     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
1110         return;
1111     }
1112     assert(job->user_paused && job->pause_count > 0);
1113     job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
1114 }
1115 
1116 void block_job_event_ready(BlockJob *job)
1117 {
1118     job_state_transition(&job->job, JOB_STATUS_READY);
1119     job->ready = true;
1120 
1121     if (block_job_is_internal(job)) {
1122         return;
1123     }
1124 
1125     qapi_event_send_block_job_ready(job_type(&job->job),
1126                                     job->job.id,
1127                                     job->len,
1128                                     job->offset,
1129                                     job->speed, &error_abort);
1130 }
1131 
1132 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
1133                                         int is_read, int error)
1134 {
1135     BlockErrorAction action;
1136 
1137     switch (on_err) {
1138     case BLOCKDEV_ON_ERROR_ENOSPC:
1139     case BLOCKDEV_ON_ERROR_AUTO:
1140         action = (error == ENOSPC) ?
1141                  BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
1142         break;
1143     case BLOCKDEV_ON_ERROR_STOP:
1144         action = BLOCK_ERROR_ACTION_STOP;
1145         break;
1146     case BLOCKDEV_ON_ERROR_REPORT:
1147         action = BLOCK_ERROR_ACTION_REPORT;
1148         break;
1149     case BLOCKDEV_ON_ERROR_IGNORE:
1150         action = BLOCK_ERROR_ACTION_IGNORE;
1151         break;
1152     default:
1153         abort();
1154     }
1155     if (!block_job_is_internal(job)) {
1156         qapi_event_send_block_job_error(job->job.id,
1157                                         is_read ? IO_OPERATION_TYPE_READ :
1158                                         IO_OPERATION_TYPE_WRITE,
1159                                         action, &error_abort);
1160     }
1161     if (action == BLOCK_ERROR_ACTION_STOP) {
1162         block_job_pause(job);
1163         /* make the pause user visible, which will be resumed from QMP. */
1164         job->user_paused = true;
1165         block_job_iostatus_set_err(job, error);
1166     }
1167     return action;
1168 }
1169