xref: /qemu/job.c (revision 2abf0da2)
1 /*
2  * Background jobs (long-running operations)
3  *
4  * Copyright (c) 2011 IBM Corp.
5  * Copyright (c) 2012, 2018 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 #include "qapi/error.h"
28 #include "qemu/job.h"
29 #include "qemu/id.h"
30 #include "qemu/main-loop.h"
31 #include "block/aio-wait.h"
32 #include "trace/trace-root.h"
33 #include "qapi/qapi-events-job.h"
34 
35 /*
36  * The job API is composed of two categories of functions.
37  *
38  * The first includes functions used by the monitor.  The monitor is
39  * peculiar in that it accesses the job list with job_get, and
40  * therefore needs consistency across job_get and the actual operation
41  * (e.g. job_user_cancel). To achieve this consistency, the caller
42  * calls job_lock/job_unlock itself around the whole operation.
43  *
44  *
45  * The second includes functions used by the job drivers and sometimes
46  * by the core block layer. These delegate the locking to the callee instead.
47  */
48 
49 /*
50  * job_mutex protects the jobs list, but also makes the
51  * struct job fields thread-safe.
52  */
53 QemuMutex job_mutex;
54 
55 /* Protected by job_mutex */
56 static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
57 
58 /* Job State Transition Table */
59 bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
60                                     /* U, C, R, P, Y, S, W, D, X, E, N */
61     /* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
62     /* C: */ [JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
63     /* R: */ [JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
64     /* P: */ [JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
65     /* Y: */ [JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
66     /* S: */ [JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
67     /* W: */ [JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
68     /* D: */ [JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
69     /* X: */ [JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
70     /* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
71     /* N: */ [JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
72 };
73 
74 bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
75                                     /* U, C, R, P, Y, S, W, D, X, E, N */
76     [JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
77     [JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
78     [JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
79     [JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
80     [JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
81     [JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
82     [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
83     [JOB_VERB_CHANGE]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
84 };
85 
86 /* Transactional group of jobs */
87 struct JobTxn {
88 
89     /* Is this txn being cancelled? */
90     bool aborting;
91 
92     /* List of jobs */
93     QLIST_HEAD(, Job) jobs;
94 
95     /* Reference count */
96     int refcnt;
97 };
98 
99 void job_lock(void)
100 {
101     qemu_mutex_lock(&job_mutex);
102 }
103 
104 void job_unlock(void)
105 {
106     qemu_mutex_unlock(&job_mutex);
107 }
108 
109 static void __attribute__((__constructor__)) job_init(void)
110 {
111     qemu_mutex_init(&job_mutex);
112 }
113 
114 JobTxn *job_txn_new(void)
115 {
116     JobTxn *txn = g_new0(JobTxn, 1);
117     QLIST_INIT(&txn->jobs);
118     txn->refcnt = 1;
119     return txn;
120 }
121 
122 /* Called with job_mutex held. */
123 static void job_txn_ref_locked(JobTxn *txn)
124 {
125     txn->refcnt++;
126 }
127 
128 void job_txn_unref_locked(JobTxn *txn)
129 {
130     if (txn && --txn->refcnt == 0) {
131         g_free(txn);
132     }
133 }
134 
135 void job_txn_unref(JobTxn *txn)
136 {
137     JOB_LOCK_GUARD();
138     job_txn_unref_locked(txn);
139 }
140 
141 /**
142  * @txn: The transaction (may be NULL)
143  * @job: Job to add to the transaction
144  *
145  * Add @job to the transaction.  The @job must not already be in a transaction.
146  * The caller must call either job_txn_unref() or job_completed() to release
147  * the reference that is automatically grabbed here.
148  *
149  * If @txn is NULL, the function does nothing.
150  *
151  * Called with job_mutex held.
152  */
153 static void job_txn_add_job_locked(JobTxn *txn, Job *job)
154 {
155     if (!txn) {
156         return;
157     }
158 
159     assert(!job->txn);
160     job->txn = txn;
161 
162     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
163     job_txn_ref_locked(txn);
164 }
165 
166 /* Called with job_mutex held. */
167 static void job_txn_del_job_locked(Job *job)
168 {
169     if (job->txn) {
170         QLIST_REMOVE(job, txn_list);
171         job_txn_unref_locked(job->txn);
172         job->txn = NULL;
173     }
174 }
175 
176 /* Called with job_mutex held, but releases it temporarily. */
177 static int job_txn_apply_locked(Job *job, int fn(Job *))
178 {
179     Job *other_job, *next;
180     JobTxn *txn = job->txn;
181     int rc = 0;
182 
183     /*
184      * Similar to job_completed_txn_abort, we take each job's lock before
185      * applying fn, but since we assume that outer_ctx is held by the caller,
186      * we need to release it here to avoid holding the lock twice - which would
187      * break AIO_WAIT_WHILE from within fn.
188      */
189     job_ref_locked(job);
190 
191     QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
192         rc = fn(other_job);
193         if (rc) {
194             break;
195         }
196     }
197 
198     job_unref_locked(job);
199     return rc;
200 }
201 
202 bool job_is_internal(Job *job)
203 {
204     return (job->id == NULL);
205 }
206 
207 /* Called with job_mutex held. */
208 static void job_state_transition_locked(Job *job, JobStatus s1)
209 {
210     JobStatus s0 = job->status;
211     assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
212     trace_job_state_transition(job, job->ret,
213                                JobSTT[s0][s1] ? "allowed" : "disallowed",
214                                JobStatus_str(s0), JobStatus_str(s1));
215     assert(JobSTT[s0][s1]);
216     job->status = s1;
217 
218     if (!job_is_internal(job) && s1 != s0) {
219         qapi_event_send_job_status_change(job->id, job->status);
220     }
221 }
222 
223 int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp)
224 {
225     JobStatus s0 = job->status;
226     assert(verb >= 0 && verb < JOB_VERB__MAX);
227     trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
228                          JobVerbTable[verb][s0] ? "allowed" : "prohibited");
229     if (JobVerbTable[verb][s0]) {
230         return 0;
231     }
232     error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
233                job->id, JobStatus_str(s0), JobVerb_str(verb));
234     return -EPERM;
235 }
236 
237 JobType job_type(const Job *job)
238 {
239     return job->driver->job_type;
240 }
241 
242 const char *job_type_str(const Job *job)
243 {
244     return JobType_str(job_type(job));
245 }
246 
247 bool job_is_cancelled_locked(Job *job)
248 {
249     /* force_cancel may be true only if cancelled is true, too */
250     assert(job->cancelled || !job->force_cancel);
251     return job->force_cancel;
252 }
253 
254 bool job_is_cancelled(Job *job)
255 {
256     JOB_LOCK_GUARD();
257     return job_is_cancelled_locked(job);
258 }
259 
260 /* Called with job_mutex held. */
261 static bool job_cancel_requested_locked(Job *job)
262 {
263     return job->cancelled;
264 }
265 
266 bool job_cancel_requested(Job *job)
267 {
268     JOB_LOCK_GUARD();
269     return job_cancel_requested_locked(job);
270 }
271 
272 bool job_is_ready_locked(Job *job)
273 {
274     switch (job->status) {
275     case JOB_STATUS_UNDEFINED:
276     case JOB_STATUS_CREATED:
277     case JOB_STATUS_RUNNING:
278     case JOB_STATUS_PAUSED:
279     case JOB_STATUS_WAITING:
280     case JOB_STATUS_PENDING:
281     case JOB_STATUS_ABORTING:
282     case JOB_STATUS_CONCLUDED:
283     case JOB_STATUS_NULL:
284         return false;
285     case JOB_STATUS_READY:
286     case JOB_STATUS_STANDBY:
287         return true;
288     default:
289         g_assert_not_reached();
290     }
291     return false;
292 }
293 
294 bool job_is_ready(Job *job)
295 {
296     JOB_LOCK_GUARD();
297     return job_is_ready_locked(job);
298 }
299 
300 bool job_is_completed_locked(Job *job)
301 {
302     switch (job->status) {
303     case JOB_STATUS_UNDEFINED:
304     case JOB_STATUS_CREATED:
305     case JOB_STATUS_RUNNING:
306     case JOB_STATUS_PAUSED:
307     case JOB_STATUS_READY:
308     case JOB_STATUS_STANDBY:
309         return false;
310     case JOB_STATUS_WAITING:
311     case JOB_STATUS_PENDING:
312     case JOB_STATUS_ABORTING:
313     case JOB_STATUS_CONCLUDED:
314     case JOB_STATUS_NULL:
315         return true;
316     default:
317         g_assert_not_reached();
318     }
319     return false;
320 }
321 
322 static bool job_is_completed(Job *job)
323 {
324     JOB_LOCK_GUARD();
325     return job_is_completed_locked(job);
326 }
327 
328 static bool job_started_locked(Job *job)
329 {
330     return job->co;
331 }
332 
333 /* Called with job_mutex held. */
334 static bool job_should_pause_locked(Job *job)
335 {
336     return job->pause_count > 0;
337 }
338 
339 Job *job_next_locked(Job *job)
340 {
341     if (!job) {
342         return QLIST_FIRST(&jobs);
343     }
344     return QLIST_NEXT(job, job_list);
345 }
346 
347 Job *job_next(Job *job)
348 {
349     JOB_LOCK_GUARD();
350     return job_next_locked(job);
351 }
352 
353 Job *job_get_locked(const char *id)
354 {
355     Job *job;
356 
357     QLIST_FOREACH(job, &jobs, job_list) {
358         if (job->id && !strcmp(id, job->id)) {
359             return job;
360         }
361     }
362 
363     return NULL;
364 }
365 
366 void job_set_aio_context(Job *job, AioContext *ctx)
367 {
368     /* protect against read in job_finish_sync_locked and job_start */
369     GLOBAL_STATE_CODE();
370     /* protect against read in job_do_yield_locked */
371     JOB_LOCK_GUARD();
372     /* ensure the job is quiescent while the AioContext is changed */
373     assert(job->paused || job_is_completed_locked(job));
374     job->aio_context = ctx;
375 }
376 
377 /* Called with job_mutex *not* held. */
378 static void job_sleep_timer_cb(void *opaque)
379 {
380     Job *job = opaque;
381 
382     job_enter(job);
383 }
384 
385 void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
386                  AioContext *ctx, int flags, BlockCompletionFunc *cb,
387                  void *opaque, Error **errp)
388 {
389     Job *job;
390 
391     JOB_LOCK_GUARD();
392 
393     if (job_id) {
394         if (flags & JOB_INTERNAL) {
395             error_setg(errp, "Cannot specify job ID for internal job");
396             return NULL;
397         }
398         if (!id_wellformed(job_id)) {
399             error_setg(errp, "Invalid job ID '%s'", job_id);
400             return NULL;
401         }
402         if (job_get_locked(job_id)) {
403             error_setg(errp, "Job ID '%s' already in use", job_id);
404             return NULL;
405         }
406     } else if (!(flags & JOB_INTERNAL)) {
407         error_setg(errp, "An explicit job ID is required");
408         return NULL;
409     }
410 
411     job = g_malloc0(driver->instance_size);
412     job->driver        = driver;
413     job->id            = g_strdup(job_id);
414     job->refcnt        = 1;
415     job->aio_context   = ctx;
416     job->busy          = false;
417     job->paused        = true;
418     job->pause_count   = 1;
419     job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
420     job->auto_dismiss  = !(flags & JOB_MANUAL_DISMISS);
421     job->cb            = cb;
422     job->opaque        = opaque;
423 
424     progress_init(&job->progress);
425 
426     notifier_list_init(&job->on_finalize_cancelled);
427     notifier_list_init(&job->on_finalize_completed);
428     notifier_list_init(&job->on_pending);
429     notifier_list_init(&job->on_ready);
430     notifier_list_init(&job->on_idle);
431 
432     job_state_transition_locked(job, JOB_STATUS_CREATED);
433     aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
434                    QEMU_CLOCK_REALTIME, SCALE_NS,
435                    job_sleep_timer_cb, job);
436 
437     QLIST_INSERT_HEAD(&jobs, job, job_list);
438 
439     /* Single jobs are modeled as single-job transactions for sake of
440      * consolidating the job management logic */
441     if (!txn) {
442         txn = job_txn_new();
443         job_txn_add_job_locked(txn, job);
444         job_txn_unref_locked(txn);
445     } else {
446         job_txn_add_job_locked(txn, job);
447     }
448 
449     return job;
450 }
451 
452 void job_ref_locked(Job *job)
453 {
454     ++job->refcnt;
455 }
456 
457 void job_unref_locked(Job *job)
458 {
459     GLOBAL_STATE_CODE();
460 
461     if (--job->refcnt == 0) {
462         assert(job->status == JOB_STATUS_NULL);
463         assert(!timer_pending(&job->sleep_timer));
464         assert(!job->txn);
465 
466         if (job->driver->free) {
467             job_unlock();
468             job->driver->free(job);
469             job_lock();
470         }
471 
472         QLIST_REMOVE(job, job_list);
473 
474         progress_destroy(&job->progress);
475         error_free(job->err);
476         g_free(job->id);
477         g_free(job);
478     }
479 }
480 
481 void job_progress_update(Job *job, uint64_t done)
482 {
483     progress_work_done(&job->progress, done);
484 }
485 
486 void job_progress_set_remaining(Job *job, uint64_t remaining)
487 {
488     progress_set_remaining(&job->progress, remaining);
489 }
490 
491 void job_progress_increase_remaining(Job *job, uint64_t delta)
492 {
493     progress_increase_remaining(&job->progress, delta);
494 }
495 
496 /**
497  * To be called when a cancelled job is finalised.
498  * Called with job_mutex held.
499  */
500 static void job_event_cancelled_locked(Job *job)
501 {
502     notifier_list_notify(&job->on_finalize_cancelled, job);
503 }
504 
505 /**
506  * To be called when a successfully completed job is finalised.
507  * Called with job_mutex held.
508  */
509 static void job_event_completed_locked(Job *job)
510 {
511     notifier_list_notify(&job->on_finalize_completed, job);
512 }
513 
514 /* Called with job_mutex held. */
515 static void job_event_pending_locked(Job *job)
516 {
517     notifier_list_notify(&job->on_pending, job);
518 }
519 
520 /* Called with job_mutex held. */
521 static void job_event_ready_locked(Job *job)
522 {
523     notifier_list_notify(&job->on_ready, job);
524 }
525 
526 /* Called with job_mutex held. */
527 static void job_event_idle_locked(Job *job)
528 {
529     notifier_list_notify(&job->on_idle, job);
530 }
531 
532 void job_enter_cond_locked(Job *job, bool(*fn)(Job *job))
533 {
534     if (!job_started_locked(job)) {
535         return;
536     }
537     if (job->deferred_to_main_loop) {
538         return;
539     }
540 
541     if (job->busy) {
542         return;
543     }
544 
545     if (fn && !fn(job)) {
546         return;
547     }
548 
549     assert(!job->deferred_to_main_loop);
550     timer_del(&job->sleep_timer);
551     job->busy = true;
552     job_unlock();
553     aio_co_wake(job->co);
554     job_lock();
555 }
556 
557 void job_enter(Job *job)
558 {
559     JOB_LOCK_GUARD();
560     job_enter_cond_locked(job, NULL);
561 }
562 
563 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
564  * Reentering the job coroutine with job_enter() before the timer has expired
565  * is allowed and cancels the timer.
566  *
567  * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
568  * called explicitly.
569  *
570  * Called with job_mutex held, but releases it temporarily.
571  */
572 static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
573 {
574     AioContext *next_aio_context;
575 
576     if (ns != -1) {
577         timer_mod(&job->sleep_timer, ns);
578     }
579     job->busy = false;
580     job_event_idle_locked(job);
581     job_unlock();
582     qemu_coroutine_yield();
583     job_lock();
584 
585     next_aio_context = job->aio_context;
586     /*
587      * Coroutine has resumed, but in the meanwhile the job AioContext
588      * might have changed via bdrv_try_change_aio_context(), so we need to move
589      * the coroutine too in the new aiocontext.
590      */
591     while (qemu_get_current_aio_context() != next_aio_context) {
592         job_unlock();
593         aio_co_reschedule_self(next_aio_context);
594         job_lock();
595         next_aio_context = job->aio_context;
596     }
597 
598     /* Set by job_enter_cond_locked() before re-entering the coroutine.  */
599     assert(job->busy);
600 }
601 
602 /* Called with job_mutex held, but releases it temporarily. */
603 static void coroutine_fn job_pause_point_locked(Job *job)
604 {
605     assert(job && job_started_locked(job));
606 
607     if (!job_should_pause_locked(job)) {
608         return;
609     }
610     if (job_is_cancelled_locked(job)) {
611         return;
612     }
613 
614     if (job->driver->pause) {
615         job_unlock();
616         job->driver->pause(job);
617         job_lock();
618     }
619 
620     if (job_should_pause_locked(job) && !job_is_cancelled_locked(job)) {
621         JobStatus status = job->status;
622         job_state_transition_locked(job, status == JOB_STATUS_READY
623                                     ? JOB_STATUS_STANDBY
624                                     : JOB_STATUS_PAUSED);
625         job->paused = true;
626         job_do_yield_locked(job, -1);
627         job->paused = false;
628         job_state_transition_locked(job, status);
629     }
630 
631     if (job->driver->resume) {
632         job_unlock();
633         job->driver->resume(job);
634         job_lock();
635     }
636 }
637 
638 void coroutine_fn job_pause_point(Job *job)
639 {
640     JOB_LOCK_GUARD();
641     job_pause_point_locked(job);
642 }
643 
644 void coroutine_fn job_yield(Job *job)
645 {
646     JOB_LOCK_GUARD();
647     assert(job->busy);
648 
649     /* Check cancellation *before* setting busy = false, too!  */
650     if (job_is_cancelled_locked(job)) {
651         return;
652     }
653 
654     if (!job_should_pause_locked(job)) {
655         job_do_yield_locked(job, -1);
656     }
657 
658     job_pause_point_locked(job);
659 }
660 
661 void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
662 {
663     JOB_LOCK_GUARD();
664     assert(job->busy);
665 
666     /* Check cancellation *before* setting busy = false, too!  */
667     if (job_is_cancelled_locked(job)) {
668         return;
669     }
670 
671     if (!job_should_pause_locked(job)) {
672         job_do_yield_locked(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
673     }
674 
675     job_pause_point_locked(job);
676 }
677 
678 /* Assumes the job_mutex is held */
679 static bool job_timer_not_pending_locked(Job *job)
680 {
681     return !timer_pending(&job->sleep_timer);
682 }
683 
684 void job_pause_locked(Job *job)
685 {
686     job->pause_count++;
687     if (!job->paused) {
688         job_enter_cond_locked(job, NULL);
689     }
690 }
691 
692 void job_pause(Job *job)
693 {
694     JOB_LOCK_GUARD();
695     job_pause_locked(job);
696 }
697 
698 void job_resume_locked(Job *job)
699 {
700     assert(job->pause_count > 0);
701     job->pause_count--;
702     if (job->pause_count) {
703         return;
704     }
705 
706     /* kick only if no timer is pending */
707     job_enter_cond_locked(job, job_timer_not_pending_locked);
708 }
709 
710 void job_resume(Job *job)
711 {
712     JOB_LOCK_GUARD();
713     job_resume_locked(job);
714 }
715 
716 void job_user_pause_locked(Job *job, Error **errp)
717 {
718     if (job_apply_verb_locked(job, JOB_VERB_PAUSE, errp)) {
719         return;
720     }
721     if (job->user_paused) {
722         error_setg(errp, "Job is already paused");
723         return;
724     }
725     job->user_paused = true;
726     job_pause_locked(job);
727 }
728 
729 bool job_user_paused_locked(Job *job)
730 {
731     return job->user_paused;
732 }
733 
734 void job_user_resume_locked(Job *job, Error **errp)
735 {
736     assert(job);
737     GLOBAL_STATE_CODE();
738     if (!job->user_paused || job->pause_count <= 0) {
739         error_setg(errp, "Can't resume a job that was not paused");
740         return;
741     }
742     if (job_apply_verb_locked(job, JOB_VERB_RESUME, errp)) {
743         return;
744     }
745     if (job->driver->user_resume) {
746         job_unlock();
747         job->driver->user_resume(job);
748         job_lock();
749     }
750     job->user_paused = false;
751     job_resume_locked(job);
752 }
753 
754 /* Called with job_mutex held, but releases it temporarily. */
755 static void job_do_dismiss_locked(Job *job)
756 {
757     assert(job);
758     job->busy = false;
759     job->paused = false;
760     job->deferred_to_main_loop = true;
761 
762     job_txn_del_job_locked(job);
763 
764     job_state_transition_locked(job, JOB_STATUS_NULL);
765     job_unref_locked(job);
766 }
767 
768 void job_dismiss_locked(Job **jobptr, Error **errp)
769 {
770     Job *job = *jobptr;
771     /* similarly to _complete, this is QMP-interface only. */
772     assert(job->id);
773     if (job_apply_verb_locked(job, JOB_VERB_DISMISS, errp)) {
774         return;
775     }
776 
777     job_do_dismiss_locked(job);
778     *jobptr = NULL;
779 }
780 
781 void job_early_fail(Job *job)
782 {
783     JOB_LOCK_GUARD();
784     assert(job->status == JOB_STATUS_CREATED);
785     job_do_dismiss_locked(job);
786 }
787 
788 /* Called with job_mutex held. */
789 static void job_conclude_locked(Job *job)
790 {
791     job_state_transition_locked(job, JOB_STATUS_CONCLUDED);
792     if (job->auto_dismiss || !job_started_locked(job)) {
793         job_do_dismiss_locked(job);
794     }
795 }
796 
797 /* Called with job_mutex held. */
798 static void job_update_rc_locked(Job *job)
799 {
800     if (!job->ret && job_is_cancelled_locked(job)) {
801         job->ret = -ECANCELED;
802     }
803     if (job->ret) {
804         if (!job->err) {
805             error_setg(&job->err, "%s", strerror(-job->ret));
806         }
807         job_state_transition_locked(job, JOB_STATUS_ABORTING);
808     }
809 }
810 
811 static void job_commit(Job *job)
812 {
813     assert(!job->ret);
814     GLOBAL_STATE_CODE();
815     if (job->driver->commit) {
816         job->driver->commit(job);
817     }
818 }
819 
820 static void job_abort(Job *job)
821 {
822     assert(job->ret);
823     GLOBAL_STATE_CODE();
824     if (job->driver->abort) {
825         job->driver->abort(job);
826     }
827 }
828 
829 static void job_clean(Job *job)
830 {
831     GLOBAL_STATE_CODE();
832     if (job->driver->clean) {
833         job->driver->clean(job);
834     }
835 }
836 
837 /*
838  * Called with job_mutex held, but releases it temporarily.
839  */
840 static int job_finalize_single_locked(Job *job)
841 {
842     int job_ret;
843 
844     assert(job_is_completed_locked(job));
845 
846     /* Ensure abort is called for late-transactional failures */
847     job_update_rc_locked(job);
848 
849     job_ret = job->ret;
850     job_unlock();
851 
852     if (!job_ret) {
853         job_commit(job);
854     } else {
855         job_abort(job);
856     }
857     job_clean(job);
858 
859     if (job->cb) {
860         job->cb(job->opaque, job_ret);
861     }
862 
863     job_lock();
864 
865     /* Emit events only if we actually started */
866     if (job_started_locked(job)) {
867         if (job_is_cancelled_locked(job)) {
868             job_event_cancelled_locked(job);
869         } else {
870             job_event_completed_locked(job);
871         }
872     }
873 
874     job_txn_del_job_locked(job);
875     job_conclude_locked(job);
876     return 0;
877 }
878 
879 /*
880  * Called with job_mutex held, but releases it temporarily.
881  */
882 static void job_cancel_async_locked(Job *job, bool force)
883 {
884     GLOBAL_STATE_CODE();
885     if (job->driver->cancel) {
886         job_unlock();
887         force = job->driver->cancel(job, force);
888         job_lock();
889     } else {
890         /* No .cancel() means the job will behave as if force-cancelled */
891         force = true;
892     }
893 
894     if (job->user_paused) {
895         /* Do not call job_enter here, the caller will handle it.  */
896         if (job->driver->user_resume) {
897             job_unlock();
898             job->driver->user_resume(job);
899             job_lock();
900         }
901         job->user_paused = false;
902         assert(job->pause_count > 0);
903         job->pause_count--;
904     }
905 
906     /*
907      * Ignore soft cancel requests after the job is already done
908      * (We will still invoke job->driver->cancel() above, but if the
909      * job driver supports soft cancelling and the job is done, that
910      * should be a no-op, too.  We still call it so it can override
911      * @force.)
912      */
913     if (force || !job->deferred_to_main_loop) {
914         job->cancelled = true;
915         /* To prevent 'force == false' overriding a previous 'force == true' */
916         job->force_cancel |= force;
917     }
918 }
919 
920 /*
921  * Called with job_mutex held, but releases it temporarily.
922  */
923 static void job_completed_txn_abort_locked(Job *job)
924 {
925     JobTxn *txn = job->txn;
926     Job *other_job;
927 
928     if (txn->aborting) {
929         /*
930          * We are cancelled by another job, which will handle everything.
931          */
932         return;
933     }
934     txn->aborting = true;
935     job_txn_ref_locked(txn);
936 
937     job_ref_locked(job);
938 
939     /* Other jobs are effectively cancelled by us, set the status for
940      * them; this job, however, may or may not be cancelled, depending
941      * on the caller, so leave it. */
942     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
943         if (other_job != job) {
944             /*
945              * This is a transaction: If one job failed, no result will matter.
946              * Therefore, pass force=true to terminate all other jobs as quickly
947              * as possible.
948              */
949             job_cancel_async_locked(other_job, true);
950         }
951     }
952     while (!QLIST_EMPTY(&txn->jobs)) {
953         other_job = QLIST_FIRST(&txn->jobs);
954         if (!job_is_completed_locked(other_job)) {
955             assert(job_cancel_requested_locked(other_job));
956             job_finish_sync_locked(other_job, NULL, NULL);
957         }
958         job_finalize_single_locked(other_job);
959     }
960 
961     job_unref_locked(job);
962     job_txn_unref_locked(txn);
963 }
964 
965 /* Called with job_mutex held, but releases it temporarily */
966 static int job_prepare_locked(Job *job)
967 {
968     int ret;
969 
970     GLOBAL_STATE_CODE();
971 
972     if (job->ret == 0 && job->driver->prepare) {
973         job_unlock();
974         ret = job->driver->prepare(job);
975         job_lock();
976         job->ret = ret;
977         job_update_rc_locked(job);
978     }
979 
980     return job->ret;
981 }
982 
983 /* Called with job_mutex held */
984 static int job_needs_finalize_locked(Job *job)
985 {
986     return !job->auto_finalize;
987 }
988 
989 /* Called with job_mutex held */
990 static void job_do_finalize_locked(Job *job)
991 {
992     int rc;
993     assert(job && job->txn);
994 
995     /* prepare the transaction to complete */
996     rc = job_txn_apply_locked(job, job_prepare_locked);
997     if (rc) {
998         job_completed_txn_abort_locked(job);
999     } else {
1000         job_txn_apply_locked(job, job_finalize_single_locked);
1001     }
1002 }
1003 
1004 void job_finalize_locked(Job *job, Error **errp)
1005 {
1006     assert(job && job->id);
1007     if (job_apply_verb_locked(job, JOB_VERB_FINALIZE, errp)) {
1008         return;
1009     }
1010     job_do_finalize_locked(job);
1011 }
1012 
1013 /* Called with job_mutex held. */
1014 static int job_transition_to_pending_locked(Job *job)
1015 {
1016     job_state_transition_locked(job, JOB_STATUS_PENDING);
1017     if (!job->auto_finalize) {
1018         job_event_pending_locked(job);
1019     }
1020     return 0;
1021 }
1022 
1023 void job_transition_to_ready(Job *job)
1024 {
1025     JOB_LOCK_GUARD();
1026     job_state_transition_locked(job, JOB_STATUS_READY);
1027     job_event_ready_locked(job);
1028 }
1029 
1030 /* Called with job_mutex held. */
1031 static void job_completed_txn_success_locked(Job *job)
1032 {
1033     JobTxn *txn = job->txn;
1034     Job *other_job;
1035 
1036     job_state_transition_locked(job, JOB_STATUS_WAITING);
1037 
1038     /*
1039      * Successful completion, see if there are other running jobs in this
1040      * txn.
1041      */
1042     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
1043         if (!job_is_completed_locked(other_job)) {
1044             return;
1045         }
1046         assert(other_job->ret == 0);
1047     }
1048 
1049     job_txn_apply_locked(job, job_transition_to_pending_locked);
1050 
1051     /* If no jobs need manual finalization, automatically do so */
1052     if (job_txn_apply_locked(job, job_needs_finalize_locked) == 0) {
1053         job_do_finalize_locked(job);
1054     }
1055 }
1056 
1057 /* Called with job_mutex held. */
1058 static void job_completed_locked(Job *job)
1059 {
1060     assert(job && job->txn && !job_is_completed_locked(job));
1061 
1062     job_update_rc_locked(job);
1063     trace_job_completed(job, job->ret);
1064     if (job->ret) {
1065         job_completed_txn_abort_locked(job);
1066     } else {
1067         job_completed_txn_success_locked(job);
1068     }
1069 }
1070 
1071 /**
1072  * Useful only as a type shim for aio_bh_schedule_oneshot.
1073  * Called with job_mutex *not* held.
1074  */
1075 static void job_exit(void *opaque)
1076 {
1077     Job *job = (Job *)opaque;
1078     JOB_LOCK_GUARD();
1079     job_ref_locked(job);
1080 
1081     /* This is a lie, we're not quiescent, but still doing the completion
1082      * callbacks. However, completion callbacks tend to involve operations that
1083      * drain block nodes, and if .drained_poll still returned true, we would
1084      * deadlock. */
1085     job->busy = false;
1086     job_event_idle_locked(job);
1087 
1088     job_completed_locked(job);
1089     job_unref_locked(job);
1090 }
1091 
1092 /**
1093  * All jobs must allow a pause point before entering their job proper. This
1094  * ensures that jobs can be paused prior to being started, then resumed later.
1095  */
1096 static void coroutine_fn job_co_entry(void *opaque)
1097 {
1098     Job *job = opaque;
1099     int ret;
1100 
1101     assert(job && job->driver && job->driver->run);
1102     WITH_JOB_LOCK_GUARD() {
1103         assert(job->aio_context == qemu_get_current_aio_context());
1104         job_pause_point_locked(job);
1105     }
1106     ret = job->driver->run(job, &job->err);
1107     WITH_JOB_LOCK_GUARD() {
1108         job->ret = ret;
1109         job->deferred_to_main_loop = true;
1110         job->busy = true;
1111     }
1112     aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
1113 }
1114 
1115 void job_start(Job *job)
1116 {
1117     assert(qemu_in_main_thread());
1118 
1119     WITH_JOB_LOCK_GUARD() {
1120         assert(job && !job_started_locked(job) && job->paused &&
1121             job->driver && job->driver->run);
1122         job->co = qemu_coroutine_create(job_co_entry, job);
1123         job->pause_count--;
1124         job->busy = true;
1125         job->paused = false;
1126         job_state_transition_locked(job, JOB_STATUS_RUNNING);
1127     }
1128     aio_co_enter(job->aio_context, job->co);
1129 }
1130 
1131 void job_cancel_locked(Job *job, bool force)
1132 {
1133     if (job->status == JOB_STATUS_CONCLUDED) {
1134         job_do_dismiss_locked(job);
1135         return;
1136     }
1137     job_cancel_async_locked(job, force);
1138     if (!job_started_locked(job)) {
1139         job_completed_locked(job);
1140     } else if (job->deferred_to_main_loop) {
1141         /*
1142          * job_cancel_async() ignores soft-cancel requests for jobs
1143          * that are already done (i.e. deferred to the main loop).  We
1144          * have to check again whether the job is really cancelled.
1145          * (job_cancel_requested() and job_is_cancelled() are equivalent
1146          * here, because job_cancel_async() will make soft-cancel
1147          * requests no-ops when deferred_to_main_loop is true.  We
1148          * choose to call job_is_cancelled() to show that we invoke
1149          * job_completed_txn_abort() only for force-cancelled jobs.)
1150          */
1151         if (job_is_cancelled_locked(job)) {
1152             job_completed_txn_abort_locked(job);
1153         }
1154     } else {
1155         job_enter_cond_locked(job, NULL);
1156     }
1157 }
1158 
1159 void job_user_cancel_locked(Job *job, bool force, Error **errp)
1160 {
1161     if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
1162         return;
1163     }
1164     job_cancel_locked(job, force);
1165 }
1166 
1167 /* A wrapper around job_cancel_locked() taking an Error ** parameter so it may
1168  * be used with job_finish_sync_locked() without the need for (rather nasty)
1169  * function pointer casts there.
1170  *
1171  * Called with job_mutex held.
1172  */
1173 static void job_cancel_err_locked(Job *job, Error **errp)
1174 {
1175     job_cancel_locked(job, false);
1176 }
1177 
1178 /**
1179  * Same as job_cancel_err(), but force-cancel.
1180  * Called with job_mutex held.
1181  */
1182 static void job_force_cancel_err_locked(Job *job, Error **errp)
1183 {
1184     job_cancel_locked(job, true);
1185 }
1186 
1187 int job_cancel_sync_locked(Job *job, bool force)
1188 {
1189     if (force) {
1190         return job_finish_sync_locked(job, &job_force_cancel_err_locked, NULL);
1191     } else {
1192         return job_finish_sync_locked(job, &job_cancel_err_locked, NULL);
1193     }
1194 }
1195 
1196 int job_cancel_sync(Job *job, bool force)
1197 {
1198     JOB_LOCK_GUARD();
1199     return job_cancel_sync_locked(job, force);
1200 }
1201 
1202 void job_cancel_sync_all(void)
1203 {
1204     Job *job;
1205     JOB_LOCK_GUARD();
1206 
1207     while ((job = job_next_locked(NULL))) {
1208         job_cancel_sync_locked(job, true);
1209     }
1210 }
1211 
1212 int job_complete_sync_locked(Job *job, Error **errp)
1213 {
1214     return job_finish_sync_locked(job, job_complete_locked, errp);
1215 }
1216 
1217 void job_complete_locked(Job *job, Error **errp)
1218 {
1219     /* Should not be reachable via external interface for internal jobs */
1220     assert(job->id);
1221     GLOBAL_STATE_CODE();
1222     if (job_apply_verb_locked(job, JOB_VERB_COMPLETE, errp)) {
1223         return;
1224     }
1225     if (job_cancel_requested_locked(job) || !job->driver->complete) {
1226         error_setg(errp, "The active block job '%s' cannot be completed",
1227                    job->id);
1228         return;
1229     }
1230 
1231     job_unlock();
1232     job->driver->complete(job, errp);
1233     job_lock();
1234 }
1235 
1236 int job_finish_sync_locked(Job *job,
1237                            void (*finish)(Job *, Error **errp),
1238                            Error **errp)
1239 {
1240     Error *local_err = NULL;
1241     int ret;
1242     GLOBAL_STATE_CODE();
1243 
1244     job_ref_locked(job);
1245 
1246     if (finish) {
1247         finish(job, &local_err);
1248     }
1249     if (local_err) {
1250         error_propagate(errp, local_err);
1251         job_unref_locked(job);
1252         return -EBUSY;
1253     }
1254 
1255     job_unlock();
1256     AIO_WAIT_WHILE_UNLOCKED(job->aio_context,
1257                             (job_enter(job), !job_is_completed(job)));
1258     job_lock();
1259 
1260     ret = (job_is_cancelled_locked(job) && job->ret == 0)
1261           ? -ECANCELED : job->ret;
1262     job_unref_locked(job);
1263     return ret;
1264 }
1265