xref: /qemu/job.c (revision fb72e779)
1 /*
2  * Background jobs (long-running operations)
3  *
4  * Copyright (c) 2011 IBM Corp.
5  * Copyright (c) 2012, 2018 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 #include "qapi/error.h"
28 #include "qemu/job.h"
29 #include "qemu/id.h"
30 #include "qemu/main-loop.h"
31 #include "block/aio-wait.h"
32 #include "trace/trace-root.h"
33 #include "qapi/qapi-events-job.h"
34 
35 /*
36  * The job API is composed of two categories of functions.
37  *
38  * The first includes functions used by the monitor.  The monitor is
39  * peculiar in that it accesses the job list with job_get, and
40  * therefore needs consistency across job_get and the actual operation
41  * (e.g. job_user_cancel). To achieve this consistency, the caller
42  * calls job_lock/job_unlock itself around the whole operation.
43  *
44  *
45  * The second includes functions used by the job drivers and sometimes
46  * by the core block layer. These delegate the locking to the callee instead.
47  */
48 
49 /*
50  * job_mutex protects the jobs list, but also makes the
51  * struct job fields thread-safe.
52  */
53 QemuMutex job_mutex;
54 
55 /* Protected by job_mutex */
56 static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
57 
58 /* Job State Transition Table */
59 bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
60                                     /* U, C, R, P, Y, S, W, D, X, E, N */
61     /* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
62     /* C: */ [JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
63     /* R: */ [JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
64     /* P: */ [JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
65     /* Y: */ [JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
66     /* S: */ [JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
67     /* W: */ [JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
68     /* D: */ [JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
69     /* X: */ [JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
70     /* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
71     /* N: */ [JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
72 };
73 
74 bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
75                                     /* U, C, R, P, Y, S, W, D, X, E, N */
76     [JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
77     [JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
78     [JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
79     [JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
80     [JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
81     [JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
82     [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
83 };
84 
85 /* Transactional group of jobs */
86 struct JobTxn {
87 
88     /* Is this txn being cancelled? */
89     bool aborting;
90 
91     /* List of jobs */
92     QLIST_HEAD(, Job) jobs;
93 
94     /* Reference count */
95     int refcnt;
96 };
97 
98 void job_lock(void)
99 {
100     qemu_mutex_lock(&job_mutex);
101 }
102 
103 void job_unlock(void)
104 {
105     qemu_mutex_unlock(&job_mutex);
106 }
107 
108 static void __attribute__((__constructor__)) job_init(void)
109 {
110     qemu_mutex_init(&job_mutex);
111 }
112 
113 JobTxn *job_txn_new(void)
114 {
115     JobTxn *txn = g_new0(JobTxn, 1);
116     QLIST_INIT(&txn->jobs);
117     txn->refcnt = 1;
118     return txn;
119 }
120 
121 /* Called with job_mutex held. */
122 static void job_txn_ref_locked(JobTxn *txn)
123 {
124     txn->refcnt++;
125 }
126 
127 void job_txn_unref_locked(JobTxn *txn)
128 {
129     if (txn && --txn->refcnt == 0) {
130         g_free(txn);
131     }
132 }
133 
134 void job_txn_unref(JobTxn *txn)
135 {
136     JOB_LOCK_GUARD();
137     job_txn_unref_locked(txn);
138 }
139 
140 /**
141  * @txn: The transaction (may be NULL)
142  * @job: Job to add to the transaction
143  *
144  * Add @job to the transaction.  The @job must not already be in a transaction.
145  * The caller must call either job_txn_unref() or job_completed() to release
146  * the reference that is automatically grabbed here.
147  *
148  * If @txn is NULL, the function does nothing.
149  *
150  * Called with job_mutex held.
151  */
152 static void job_txn_add_job_locked(JobTxn *txn, Job *job)
153 {
154     if (!txn) {
155         return;
156     }
157 
158     assert(!job->txn);
159     job->txn = txn;
160 
161     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
162     job_txn_ref_locked(txn);
163 }
164 
165 /* Called with job_mutex held. */
166 static void job_txn_del_job_locked(Job *job)
167 {
168     if (job->txn) {
169         QLIST_REMOVE(job, txn_list);
170         job_txn_unref_locked(job->txn);
171         job->txn = NULL;
172     }
173 }
174 
175 /* Called with job_mutex held, but releases it temporarily. */
176 static int job_txn_apply_locked(Job *job, int fn(Job *))
177 {
178     Job *other_job, *next;
179     JobTxn *txn = job->txn;
180     int rc = 0;
181 
182     /*
183      * Similar to job_completed_txn_abort, we take each job's lock before
184      * applying fn, but since we assume that outer_ctx is held by the caller,
185      * we need to release it here to avoid holding the lock twice - which would
186      * break AIO_WAIT_WHILE from within fn.
187      */
188     job_ref_locked(job);
189 
190     QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
191         rc = fn(other_job);
192         if (rc) {
193             break;
194         }
195     }
196 
197     job_unref_locked(job);
198     return rc;
199 }
200 
201 bool job_is_internal(Job *job)
202 {
203     return (job->id == NULL);
204 }
205 
206 /* Called with job_mutex held. */
207 static void job_state_transition_locked(Job *job, JobStatus s1)
208 {
209     JobStatus s0 = job->status;
210     assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
211     trace_job_state_transition(job, job->ret,
212                                JobSTT[s0][s1] ? "allowed" : "disallowed",
213                                JobStatus_str(s0), JobStatus_str(s1));
214     assert(JobSTT[s0][s1]);
215     job->status = s1;
216 
217     if (!job_is_internal(job) && s1 != s0) {
218         qapi_event_send_job_status_change(job->id, job->status);
219     }
220 }
221 
222 int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp)
223 {
224     JobStatus s0 = job->status;
225     assert(verb >= 0 && verb < JOB_VERB__MAX);
226     trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
227                          JobVerbTable[verb][s0] ? "allowed" : "prohibited");
228     if (JobVerbTable[verb][s0]) {
229         return 0;
230     }
231     error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
232                job->id, JobStatus_str(s0), JobVerb_str(verb));
233     return -EPERM;
234 }
235 
236 JobType job_type(const Job *job)
237 {
238     return job->driver->job_type;
239 }
240 
241 const char *job_type_str(const Job *job)
242 {
243     return JobType_str(job_type(job));
244 }
245 
246 bool job_is_cancelled_locked(Job *job)
247 {
248     /* force_cancel may be true only if cancelled is true, too */
249     assert(job->cancelled || !job->force_cancel);
250     return job->force_cancel;
251 }
252 
253 bool job_is_cancelled(Job *job)
254 {
255     JOB_LOCK_GUARD();
256     return job_is_cancelled_locked(job);
257 }
258 
259 /* Called with job_mutex held. */
260 static bool job_cancel_requested_locked(Job *job)
261 {
262     return job->cancelled;
263 }
264 
265 bool job_cancel_requested(Job *job)
266 {
267     JOB_LOCK_GUARD();
268     return job_cancel_requested_locked(job);
269 }
270 
271 bool job_is_ready_locked(Job *job)
272 {
273     switch (job->status) {
274     case JOB_STATUS_UNDEFINED:
275     case JOB_STATUS_CREATED:
276     case JOB_STATUS_RUNNING:
277     case JOB_STATUS_PAUSED:
278     case JOB_STATUS_WAITING:
279     case JOB_STATUS_PENDING:
280     case JOB_STATUS_ABORTING:
281     case JOB_STATUS_CONCLUDED:
282     case JOB_STATUS_NULL:
283         return false;
284     case JOB_STATUS_READY:
285     case JOB_STATUS_STANDBY:
286         return true;
287     default:
288         g_assert_not_reached();
289     }
290     return false;
291 }
292 
293 bool job_is_ready(Job *job)
294 {
295     JOB_LOCK_GUARD();
296     return job_is_ready_locked(job);
297 }
298 
299 bool job_is_completed_locked(Job *job)
300 {
301     switch (job->status) {
302     case JOB_STATUS_UNDEFINED:
303     case JOB_STATUS_CREATED:
304     case JOB_STATUS_RUNNING:
305     case JOB_STATUS_PAUSED:
306     case JOB_STATUS_READY:
307     case JOB_STATUS_STANDBY:
308         return false;
309     case JOB_STATUS_WAITING:
310     case JOB_STATUS_PENDING:
311     case JOB_STATUS_ABORTING:
312     case JOB_STATUS_CONCLUDED:
313     case JOB_STATUS_NULL:
314         return true;
315     default:
316         g_assert_not_reached();
317     }
318     return false;
319 }
320 
321 static bool job_is_completed(Job *job)
322 {
323     JOB_LOCK_GUARD();
324     return job_is_completed_locked(job);
325 }
326 
327 static bool job_started_locked(Job *job)
328 {
329     return job->co;
330 }
331 
332 /* Called with job_mutex held. */
333 static bool job_should_pause_locked(Job *job)
334 {
335     return job->pause_count > 0;
336 }
337 
338 Job *job_next_locked(Job *job)
339 {
340     if (!job) {
341         return QLIST_FIRST(&jobs);
342     }
343     return QLIST_NEXT(job, job_list);
344 }
345 
346 Job *job_next(Job *job)
347 {
348     JOB_LOCK_GUARD();
349     return job_next_locked(job);
350 }
351 
352 Job *job_get_locked(const char *id)
353 {
354     Job *job;
355 
356     QLIST_FOREACH(job, &jobs, job_list) {
357         if (job->id && !strcmp(id, job->id)) {
358             return job;
359         }
360     }
361 
362     return NULL;
363 }
364 
365 void job_set_aio_context(Job *job, AioContext *ctx)
366 {
367     /* protect against read in job_finish_sync_locked and job_start */
368     GLOBAL_STATE_CODE();
369     /* protect against read in job_do_yield_locked */
370     JOB_LOCK_GUARD();
371     /* ensure the job is quiescent while the AioContext is changed */
372     assert(job->paused || job_is_completed_locked(job));
373     job->aio_context = ctx;
374 }
375 
376 /* Called with job_mutex *not* held. */
377 static void job_sleep_timer_cb(void *opaque)
378 {
379     Job *job = opaque;
380 
381     job_enter(job);
382 }
383 
384 void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
385                  AioContext *ctx, int flags, BlockCompletionFunc *cb,
386                  void *opaque, Error **errp)
387 {
388     Job *job;
389 
390     JOB_LOCK_GUARD();
391 
392     if (job_id) {
393         if (flags & JOB_INTERNAL) {
394             error_setg(errp, "Cannot specify job ID for internal job");
395             return NULL;
396         }
397         if (!id_wellformed(job_id)) {
398             error_setg(errp, "Invalid job ID '%s'", job_id);
399             return NULL;
400         }
401         if (job_get_locked(job_id)) {
402             error_setg(errp, "Job ID '%s' already in use", job_id);
403             return NULL;
404         }
405     } else if (!(flags & JOB_INTERNAL)) {
406         error_setg(errp, "An explicit job ID is required");
407         return NULL;
408     }
409 
410     job = g_malloc0(driver->instance_size);
411     job->driver        = driver;
412     job->id            = g_strdup(job_id);
413     job->refcnt        = 1;
414     job->aio_context   = ctx;
415     job->busy          = false;
416     job->paused        = true;
417     job->pause_count   = 1;
418     job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
419     job->auto_dismiss  = !(flags & JOB_MANUAL_DISMISS);
420     job->cb            = cb;
421     job->opaque        = opaque;
422 
423     progress_init(&job->progress);
424 
425     notifier_list_init(&job->on_finalize_cancelled);
426     notifier_list_init(&job->on_finalize_completed);
427     notifier_list_init(&job->on_pending);
428     notifier_list_init(&job->on_ready);
429     notifier_list_init(&job->on_idle);
430 
431     job_state_transition_locked(job, JOB_STATUS_CREATED);
432     aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
433                    QEMU_CLOCK_REALTIME, SCALE_NS,
434                    job_sleep_timer_cb, job);
435 
436     QLIST_INSERT_HEAD(&jobs, job, job_list);
437 
438     /* Single jobs are modeled as single-job transactions for sake of
439      * consolidating the job management logic */
440     if (!txn) {
441         txn = job_txn_new();
442         job_txn_add_job_locked(txn, job);
443         job_txn_unref_locked(txn);
444     } else {
445         job_txn_add_job_locked(txn, job);
446     }
447 
448     return job;
449 }
450 
451 void job_ref_locked(Job *job)
452 {
453     ++job->refcnt;
454 }
455 
456 void job_unref_locked(Job *job)
457 {
458     GLOBAL_STATE_CODE();
459 
460     if (--job->refcnt == 0) {
461         assert(job->status == JOB_STATUS_NULL);
462         assert(!timer_pending(&job->sleep_timer));
463         assert(!job->txn);
464 
465         if (job->driver->free) {
466             AioContext *aio_context = job->aio_context;
467             job_unlock();
468             /* FIXME: aiocontext lock is required because cb calls blk_unref */
469             aio_context_acquire(aio_context);
470             job->driver->free(job);
471             aio_context_release(aio_context);
472             job_lock();
473         }
474 
475         QLIST_REMOVE(job, job_list);
476 
477         progress_destroy(&job->progress);
478         error_free(job->err);
479         g_free(job->id);
480         g_free(job);
481     }
482 }
483 
484 void job_progress_update(Job *job, uint64_t done)
485 {
486     progress_work_done(&job->progress, done);
487 }
488 
489 void job_progress_set_remaining(Job *job, uint64_t remaining)
490 {
491     progress_set_remaining(&job->progress, remaining);
492 }
493 
494 void job_progress_increase_remaining(Job *job, uint64_t delta)
495 {
496     progress_increase_remaining(&job->progress, delta);
497 }
498 
499 /**
500  * To be called when a cancelled job is finalised.
501  * Called with job_mutex held.
502  */
503 static void job_event_cancelled_locked(Job *job)
504 {
505     notifier_list_notify(&job->on_finalize_cancelled, job);
506 }
507 
508 /**
509  * To be called when a successfully completed job is finalised.
510  * Called with job_mutex held.
511  */
512 static void job_event_completed_locked(Job *job)
513 {
514     notifier_list_notify(&job->on_finalize_completed, job);
515 }
516 
517 /* Called with job_mutex held. */
518 static void job_event_pending_locked(Job *job)
519 {
520     notifier_list_notify(&job->on_pending, job);
521 }
522 
523 /* Called with job_mutex held. */
524 static void job_event_ready_locked(Job *job)
525 {
526     notifier_list_notify(&job->on_ready, job);
527 }
528 
529 /* Called with job_mutex held. */
530 static void job_event_idle_locked(Job *job)
531 {
532     notifier_list_notify(&job->on_idle, job);
533 }
534 
535 void job_enter_cond_locked(Job *job, bool(*fn)(Job *job))
536 {
537     if (!job_started_locked(job)) {
538         return;
539     }
540     if (job->deferred_to_main_loop) {
541         return;
542     }
543 
544     if (job->busy) {
545         return;
546     }
547 
548     if (fn && !fn(job)) {
549         return;
550     }
551 
552     assert(!job->deferred_to_main_loop);
553     timer_del(&job->sleep_timer);
554     job->busy = true;
555     job_unlock();
556     aio_co_wake(job->co);
557     job_lock();
558 }
559 
560 void job_enter(Job *job)
561 {
562     JOB_LOCK_GUARD();
563     job_enter_cond_locked(job, NULL);
564 }
565 
566 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
567  * Reentering the job coroutine with job_enter() before the timer has expired
568  * is allowed and cancels the timer.
569  *
570  * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
571  * called explicitly.
572  *
573  * Called with job_mutex held, but releases it temporarily.
574  */
575 static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
576 {
577     AioContext *next_aio_context;
578 
579     if (ns != -1) {
580         timer_mod(&job->sleep_timer, ns);
581     }
582     job->busy = false;
583     job_event_idle_locked(job);
584     job_unlock();
585     qemu_coroutine_yield();
586     job_lock();
587 
588     next_aio_context = job->aio_context;
589     /*
590      * Coroutine has resumed, but in the meanwhile the job AioContext
591      * might have changed via bdrv_try_change_aio_context(), so we need to move
592      * the coroutine too in the new aiocontext.
593      */
594     while (qemu_get_current_aio_context() != next_aio_context) {
595         job_unlock();
596         aio_co_reschedule_self(next_aio_context);
597         job_lock();
598         next_aio_context = job->aio_context;
599     }
600 
601     /* Set by job_enter_cond_locked() before re-entering the coroutine.  */
602     assert(job->busy);
603 }
604 
605 /* Called with job_mutex held, but releases it temporarily. */
606 static void coroutine_fn job_pause_point_locked(Job *job)
607 {
608     assert(job && job_started_locked(job));
609 
610     if (!job_should_pause_locked(job)) {
611         return;
612     }
613     if (job_is_cancelled_locked(job)) {
614         return;
615     }
616 
617     if (job->driver->pause) {
618         job_unlock();
619         job->driver->pause(job);
620         job_lock();
621     }
622 
623     if (job_should_pause_locked(job) && !job_is_cancelled_locked(job)) {
624         JobStatus status = job->status;
625         job_state_transition_locked(job, status == JOB_STATUS_READY
626                                     ? JOB_STATUS_STANDBY
627                                     : JOB_STATUS_PAUSED);
628         job->paused = true;
629         job_do_yield_locked(job, -1);
630         job->paused = false;
631         job_state_transition_locked(job, status);
632     }
633 
634     if (job->driver->resume) {
635         job_unlock();
636         job->driver->resume(job);
637         job_lock();
638     }
639 }
640 
641 void coroutine_fn job_pause_point(Job *job)
642 {
643     JOB_LOCK_GUARD();
644     job_pause_point_locked(job);
645 }
646 
647 void coroutine_fn job_yield(Job *job)
648 {
649     JOB_LOCK_GUARD();
650     assert(job->busy);
651 
652     /* Check cancellation *before* setting busy = false, too!  */
653     if (job_is_cancelled_locked(job)) {
654         return;
655     }
656 
657     if (!job_should_pause_locked(job)) {
658         job_do_yield_locked(job, -1);
659     }
660 
661     job_pause_point_locked(job);
662 }
663 
664 void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
665 {
666     JOB_LOCK_GUARD();
667     assert(job->busy);
668 
669     /* Check cancellation *before* setting busy = false, too!  */
670     if (job_is_cancelled_locked(job)) {
671         return;
672     }
673 
674     if (!job_should_pause_locked(job)) {
675         job_do_yield_locked(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
676     }
677 
678     job_pause_point_locked(job);
679 }
680 
681 /* Assumes the job_mutex is held */
682 static bool job_timer_not_pending_locked(Job *job)
683 {
684     return !timer_pending(&job->sleep_timer);
685 }
686 
687 void job_pause_locked(Job *job)
688 {
689     job->pause_count++;
690     if (!job->paused) {
691         job_enter_cond_locked(job, NULL);
692     }
693 }
694 
695 void job_pause(Job *job)
696 {
697     JOB_LOCK_GUARD();
698     job_pause_locked(job);
699 }
700 
701 void job_resume_locked(Job *job)
702 {
703     assert(job->pause_count > 0);
704     job->pause_count--;
705     if (job->pause_count) {
706         return;
707     }
708 
709     /* kick only if no timer is pending */
710     job_enter_cond_locked(job, job_timer_not_pending_locked);
711 }
712 
713 void job_resume(Job *job)
714 {
715     JOB_LOCK_GUARD();
716     job_resume_locked(job);
717 }
718 
719 void job_user_pause_locked(Job *job, Error **errp)
720 {
721     if (job_apply_verb_locked(job, JOB_VERB_PAUSE, errp)) {
722         return;
723     }
724     if (job->user_paused) {
725         error_setg(errp, "Job is already paused");
726         return;
727     }
728     job->user_paused = true;
729     job_pause_locked(job);
730 }
731 
732 bool job_user_paused_locked(Job *job)
733 {
734     return job->user_paused;
735 }
736 
737 void job_user_resume_locked(Job *job, Error **errp)
738 {
739     assert(job);
740     GLOBAL_STATE_CODE();
741     if (!job->user_paused || job->pause_count <= 0) {
742         error_setg(errp, "Can't resume a job that was not paused");
743         return;
744     }
745     if (job_apply_verb_locked(job, JOB_VERB_RESUME, errp)) {
746         return;
747     }
748     if (job->driver->user_resume) {
749         job_unlock();
750         job->driver->user_resume(job);
751         job_lock();
752     }
753     job->user_paused = false;
754     job_resume_locked(job);
755 }
756 
757 /* Called with job_mutex held, but releases it temporarily. */
758 static void job_do_dismiss_locked(Job *job)
759 {
760     assert(job);
761     job->busy = false;
762     job->paused = false;
763     job->deferred_to_main_loop = true;
764 
765     job_txn_del_job_locked(job);
766 
767     job_state_transition_locked(job, JOB_STATUS_NULL);
768     job_unref_locked(job);
769 }
770 
771 void job_dismiss_locked(Job **jobptr, Error **errp)
772 {
773     Job *job = *jobptr;
774     /* similarly to _complete, this is QMP-interface only. */
775     assert(job->id);
776     if (job_apply_verb_locked(job, JOB_VERB_DISMISS, errp)) {
777         return;
778     }
779 
780     job_do_dismiss_locked(job);
781     *jobptr = NULL;
782 }
783 
784 void job_early_fail(Job *job)
785 {
786     JOB_LOCK_GUARD();
787     assert(job->status == JOB_STATUS_CREATED);
788     job_do_dismiss_locked(job);
789 }
790 
791 /* Called with job_mutex held. */
792 static void job_conclude_locked(Job *job)
793 {
794     job_state_transition_locked(job, JOB_STATUS_CONCLUDED);
795     if (job->auto_dismiss || !job_started_locked(job)) {
796         job_do_dismiss_locked(job);
797     }
798 }
799 
800 /* Called with job_mutex held. */
801 static void job_update_rc_locked(Job *job)
802 {
803     if (!job->ret && job_is_cancelled_locked(job)) {
804         job->ret = -ECANCELED;
805     }
806     if (job->ret) {
807         if (!job->err) {
808             error_setg(&job->err, "%s", strerror(-job->ret));
809         }
810         job_state_transition_locked(job, JOB_STATUS_ABORTING);
811     }
812 }
813 
814 static void job_commit(Job *job)
815 {
816     assert(!job->ret);
817     GLOBAL_STATE_CODE();
818     if (job->driver->commit) {
819         job->driver->commit(job);
820     }
821 }
822 
823 static void job_abort(Job *job)
824 {
825     assert(job->ret);
826     GLOBAL_STATE_CODE();
827     if (job->driver->abort) {
828         job->driver->abort(job);
829     }
830 }
831 
832 static void job_clean(Job *job)
833 {
834     GLOBAL_STATE_CODE();
835     if (job->driver->clean) {
836         job->driver->clean(job);
837     }
838 }
839 
840 /*
841  * Called with job_mutex held, but releases it temporarily.
842  * Takes AioContext lock internally to invoke a job->driver callback.
843  */
844 static int job_finalize_single_locked(Job *job)
845 {
846     int job_ret;
847     AioContext *ctx = job->aio_context;
848 
849     assert(job_is_completed_locked(job));
850 
851     /* Ensure abort is called for late-transactional failures */
852     job_update_rc_locked(job);
853 
854     job_ret = job->ret;
855     job_unlock();
856     aio_context_acquire(ctx);
857 
858     if (!job_ret) {
859         job_commit(job);
860     } else {
861         job_abort(job);
862     }
863     job_clean(job);
864 
865     if (job->cb) {
866         job->cb(job->opaque, job_ret);
867     }
868 
869     aio_context_release(ctx);
870     job_lock();
871 
872     /* Emit events only if we actually started */
873     if (job_started_locked(job)) {
874         if (job_is_cancelled_locked(job)) {
875             job_event_cancelled_locked(job);
876         } else {
877             job_event_completed_locked(job);
878         }
879     }
880 
881     job_txn_del_job_locked(job);
882     job_conclude_locked(job);
883     return 0;
884 }
885 
886 /*
887  * Called with job_mutex held, but releases it temporarily.
888  * Takes AioContext lock internally to invoke a job->driver callback.
889  */
890 static void job_cancel_async_locked(Job *job, bool force)
891 {
892     AioContext *ctx = job->aio_context;
893     GLOBAL_STATE_CODE();
894     if (job->driver->cancel) {
895         job_unlock();
896         aio_context_acquire(ctx);
897         force = job->driver->cancel(job, force);
898         aio_context_release(ctx);
899         job_lock();
900     } else {
901         /* No .cancel() means the job will behave as if force-cancelled */
902         force = true;
903     }
904 
905     if (job->user_paused) {
906         /* Do not call job_enter here, the caller will handle it.  */
907         if (job->driver->user_resume) {
908             job_unlock();
909             job->driver->user_resume(job);
910             job_lock();
911         }
912         job->user_paused = false;
913         assert(job->pause_count > 0);
914         job->pause_count--;
915     }
916 
917     /*
918      * Ignore soft cancel requests after the job is already done
919      * (We will still invoke job->driver->cancel() above, but if the
920      * job driver supports soft cancelling and the job is done, that
921      * should be a no-op, too.  We still call it so it can override
922      * @force.)
923      */
924     if (force || !job->deferred_to_main_loop) {
925         job->cancelled = true;
926         /* To prevent 'force == false' overriding a previous 'force == true' */
927         job->force_cancel |= force;
928     }
929 }
930 
931 /*
932  * Called with job_mutex held, but releases it temporarily.
933  * Takes AioContext lock internally to invoke a job->driver callback.
934  */
935 static void job_completed_txn_abort_locked(Job *job)
936 {
937     JobTxn *txn = job->txn;
938     Job *other_job;
939 
940     if (txn->aborting) {
941         /*
942          * We are cancelled by another job, which will handle everything.
943          */
944         return;
945     }
946     txn->aborting = true;
947     job_txn_ref_locked(txn);
948 
949     job_ref_locked(job);
950 
951     /* Other jobs are effectively cancelled by us, set the status for
952      * them; this job, however, may or may not be cancelled, depending
953      * on the caller, so leave it. */
954     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
955         if (other_job != job) {
956             /*
957              * This is a transaction: If one job failed, no result will matter.
958              * Therefore, pass force=true to terminate all other jobs as quickly
959              * as possible.
960              */
961             job_cancel_async_locked(other_job, true);
962         }
963     }
964     while (!QLIST_EMPTY(&txn->jobs)) {
965         other_job = QLIST_FIRST(&txn->jobs);
966         if (!job_is_completed_locked(other_job)) {
967             assert(job_cancel_requested_locked(other_job));
968             job_finish_sync_locked(other_job, NULL, NULL);
969         }
970         job_finalize_single_locked(other_job);
971     }
972 
973     job_unref_locked(job);
974     job_txn_unref_locked(txn);
975 }
976 
977 /* Called with job_mutex held, but releases it temporarily */
978 static int job_prepare_locked(Job *job)
979 {
980     int ret;
981     AioContext *ctx = job->aio_context;
982 
983     GLOBAL_STATE_CODE();
984 
985     if (job->ret == 0 && job->driver->prepare) {
986         job_unlock();
987         aio_context_acquire(ctx);
988         ret = job->driver->prepare(job);
989         aio_context_release(ctx);
990         job_lock();
991         job->ret = ret;
992         job_update_rc_locked(job);
993     }
994 
995     return job->ret;
996 }
997 
998 /* Called with job_mutex held */
999 static int job_needs_finalize_locked(Job *job)
1000 {
1001     return !job->auto_finalize;
1002 }
1003 
1004 /* Called with job_mutex held */
1005 static void job_do_finalize_locked(Job *job)
1006 {
1007     int rc;
1008     assert(job && job->txn);
1009 
1010     /* prepare the transaction to complete */
1011     rc = job_txn_apply_locked(job, job_prepare_locked);
1012     if (rc) {
1013         job_completed_txn_abort_locked(job);
1014     } else {
1015         job_txn_apply_locked(job, job_finalize_single_locked);
1016     }
1017 }
1018 
1019 void job_finalize_locked(Job *job, Error **errp)
1020 {
1021     assert(job && job->id);
1022     if (job_apply_verb_locked(job, JOB_VERB_FINALIZE, errp)) {
1023         return;
1024     }
1025     job_do_finalize_locked(job);
1026 }
1027 
1028 /* Called with job_mutex held. */
1029 static int job_transition_to_pending_locked(Job *job)
1030 {
1031     job_state_transition_locked(job, JOB_STATUS_PENDING);
1032     if (!job->auto_finalize) {
1033         job_event_pending_locked(job);
1034     }
1035     return 0;
1036 }
1037 
1038 void job_transition_to_ready(Job *job)
1039 {
1040     JOB_LOCK_GUARD();
1041     job_state_transition_locked(job, JOB_STATUS_READY);
1042     job_event_ready_locked(job);
1043 }
1044 
1045 /* Called with job_mutex held. */
1046 static void job_completed_txn_success_locked(Job *job)
1047 {
1048     JobTxn *txn = job->txn;
1049     Job *other_job;
1050 
1051     job_state_transition_locked(job, JOB_STATUS_WAITING);
1052 
1053     /*
1054      * Successful completion, see if there are other running jobs in this
1055      * txn.
1056      */
1057     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
1058         if (!job_is_completed_locked(other_job)) {
1059             return;
1060         }
1061         assert(other_job->ret == 0);
1062     }
1063 
1064     job_txn_apply_locked(job, job_transition_to_pending_locked);
1065 
1066     /* If no jobs need manual finalization, automatically do so */
1067     if (job_txn_apply_locked(job, job_needs_finalize_locked) == 0) {
1068         job_do_finalize_locked(job);
1069     }
1070 }
1071 
1072 /* Called with job_mutex held. */
1073 static void job_completed_locked(Job *job)
1074 {
1075     assert(job && job->txn && !job_is_completed_locked(job));
1076 
1077     job_update_rc_locked(job);
1078     trace_job_completed(job, job->ret);
1079     if (job->ret) {
1080         job_completed_txn_abort_locked(job);
1081     } else {
1082         job_completed_txn_success_locked(job);
1083     }
1084 }
1085 
1086 /**
1087  * Useful only as a type shim for aio_bh_schedule_oneshot.
1088  * Called with job_mutex *not* held.
1089  */
1090 static void job_exit(void *opaque)
1091 {
1092     Job *job = (Job *)opaque;
1093     JOB_LOCK_GUARD();
1094     job_ref_locked(job);
1095 
1096     /* This is a lie, we're not quiescent, but still doing the completion
1097      * callbacks. However, completion callbacks tend to involve operations that
1098      * drain block nodes, and if .drained_poll still returned true, we would
1099      * deadlock. */
1100     job->busy = false;
1101     job_event_idle_locked(job);
1102 
1103     job_completed_locked(job);
1104     job_unref_locked(job);
1105 }
1106 
1107 /**
1108  * All jobs must allow a pause point before entering their job proper. This
1109  * ensures that jobs can be paused prior to being started, then resumed later.
1110  */
1111 static void coroutine_fn job_co_entry(void *opaque)
1112 {
1113     Job *job = opaque;
1114     int ret;
1115 
1116     assert(job && job->driver && job->driver->run);
1117     WITH_JOB_LOCK_GUARD() {
1118         assert(job->aio_context == qemu_get_current_aio_context());
1119         job_pause_point_locked(job);
1120     }
1121     ret = job->driver->run(job, &job->err);
1122     WITH_JOB_LOCK_GUARD() {
1123         job->ret = ret;
1124         job->deferred_to_main_loop = true;
1125         job->busy = true;
1126     }
1127     aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
1128 }
1129 
1130 void job_start(Job *job)
1131 {
1132     assert(qemu_in_main_thread());
1133 
1134     WITH_JOB_LOCK_GUARD() {
1135         assert(job && !job_started_locked(job) && job->paused &&
1136             job->driver && job->driver->run);
1137         job->co = qemu_coroutine_create(job_co_entry, job);
1138         job->pause_count--;
1139         job->busy = true;
1140         job->paused = false;
1141         job_state_transition_locked(job, JOB_STATUS_RUNNING);
1142     }
1143     aio_co_enter(job->aio_context, job->co);
1144 }
1145 
1146 void job_cancel_locked(Job *job, bool force)
1147 {
1148     if (job->status == JOB_STATUS_CONCLUDED) {
1149         job_do_dismiss_locked(job);
1150         return;
1151     }
1152     job_cancel_async_locked(job, force);
1153     if (!job_started_locked(job)) {
1154         job_completed_locked(job);
1155     } else if (job->deferred_to_main_loop) {
1156         /*
1157          * job_cancel_async() ignores soft-cancel requests for jobs
1158          * that are already done (i.e. deferred to the main loop).  We
1159          * have to check again whether the job is really cancelled.
1160          * (job_cancel_requested() and job_is_cancelled() are equivalent
1161          * here, because job_cancel_async() will make soft-cancel
1162          * requests no-ops when deferred_to_main_loop is true.  We
1163          * choose to call job_is_cancelled() to show that we invoke
1164          * job_completed_txn_abort() only for force-cancelled jobs.)
1165          */
1166         if (job_is_cancelled_locked(job)) {
1167             job_completed_txn_abort_locked(job);
1168         }
1169     } else {
1170         job_enter_cond_locked(job, NULL);
1171     }
1172 }
1173 
1174 void job_user_cancel_locked(Job *job, bool force, Error **errp)
1175 {
1176     if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
1177         return;
1178     }
1179     job_cancel_locked(job, force);
1180 }
1181 
1182 /* A wrapper around job_cancel_locked() taking an Error ** parameter so it may
1183  * be used with job_finish_sync_locked() without the need for (rather nasty)
1184  * function pointer casts there.
1185  *
1186  * Called with job_mutex held.
1187  */
1188 static void job_cancel_err_locked(Job *job, Error **errp)
1189 {
1190     job_cancel_locked(job, false);
1191 }
1192 
1193 /**
1194  * Same as job_cancel_err(), but force-cancel.
1195  * Called with job_mutex held.
1196  */
1197 static void job_force_cancel_err_locked(Job *job, Error **errp)
1198 {
1199     job_cancel_locked(job, true);
1200 }
1201 
1202 int job_cancel_sync_locked(Job *job, bool force)
1203 {
1204     if (force) {
1205         return job_finish_sync_locked(job, &job_force_cancel_err_locked, NULL);
1206     } else {
1207         return job_finish_sync_locked(job, &job_cancel_err_locked, NULL);
1208     }
1209 }
1210 
1211 int job_cancel_sync(Job *job, bool force)
1212 {
1213     JOB_LOCK_GUARD();
1214     return job_cancel_sync_locked(job, force);
1215 }
1216 
1217 void job_cancel_sync_all(void)
1218 {
1219     Job *job;
1220     JOB_LOCK_GUARD();
1221 
1222     while ((job = job_next_locked(NULL))) {
1223         job_cancel_sync_locked(job, true);
1224     }
1225 }
1226 
1227 int job_complete_sync_locked(Job *job, Error **errp)
1228 {
1229     return job_finish_sync_locked(job, job_complete_locked, errp);
1230 }
1231 
1232 void job_complete_locked(Job *job, Error **errp)
1233 {
1234     /* Should not be reachable via external interface for internal jobs */
1235     assert(job->id);
1236     GLOBAL_STATE_CODE();
1237     if (job_apply_verb_locked(job, JOB_VERB_COMPLETE, errp)) {
1238         return;
1239     }
1240     if (job_cancel_requested_locked(job) || !job->driver->complete) {
1241         error_setg(errp, "The active block job '%s' cannot be completed",
1242                    job->id);
1243         return;
1244     }
1245 
1246     job_unlock();
1247     job->driver->complete(job, errp);
1248     job_lock();
1249 }
1250 
1251 int job_finish_sync_locked(Job *job,
1252                            void (*finish)(Job *, Error **errp),
1253                            Error **errp)
1254 {
1255     Error *local_err = NULL;
1256     int ret;
1257     GLOBAL_STATE_CODE();
1258 
1259     job_ref_locked(job);
1260 
1261     if (finish) {
1262         finish(job, &local_err);
1263     }
1264     if (local_err) {
1265         error_propagate(errp, local_err);
1266         job_unref_locked(job);
1267         return -EBUSY;
1268     }
1269 
1270     job_unlock();
1271     AIO_WAIT_WHILE_UNLOCKED(job->aio_context,
1272                             (job_enter(job), !job_is_completed(job)));
1273     job_lock();
1274 
1275     ret = (job_is_cancelled_locked(job) && job->ret == 0)
1276           ? -ECANCELED : job->ret;
1277     job_unref_locked(job);
1278     return ret;
1279 }
1280