]> git.proxmox.com Git - mirror_qemu.git/blob - blockjob.c
job: Add reference counting
[mirror_qemu.git] / blockjob.c
1 /*
2 * QEMU System Emulator block driver
3 *
4 * Copyright (c) 2011 IBM Corp.
5 * Copyright (c) 2012 Red Hat, Inc.
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 * THE SOFTWARE.
24 */
25
26 #include "qemu/osdep.h"
27 #include "qemu-common.h"
28 #include "block/block.h"
29 #include "block/blockjob_int.h"
30 #include "block/block_int.h"
31 #include "block/trace.h"
32 #include "sysemu/block-backend.h"
33 #include "qapi/error.h"
34 #include "qapi/qapi-events-block-core.h"
35 #include "qapi/qmp/qerror.h"
36 #include "qemu/coroutine.h"
37 #include "qemu/timer.h"
38
39 /* Right now, this mutex is only needed to synchronize accesses to job->busy
40 * and job->sleep_timer, such as concurrent calls to block_job_do_yield and
41 * block_job_enter. */
42 static QemuMutex block_job_mutex;
43
44 static void block_job_lock(void)
45 {
46 qemu_mutex_lock(&block_job_mutex);
47 }
48
49 static void block_job_unlock(void)
50 {
51 qemu_mutex_unlock(&block_job_mutex);
52 }
53
54 static void __attribute__((__constructor__)) block_job_init(void)
55 {
56 qemu_mutex_init(&block_job_mutex);
57 }
58
59 static void block_job_event_cancelled(BlockJob *job);
60 static void block_job_event_completed(BlockJob *job, const char *msg);
61 static int block_job_event_pending(BlockJob *job);
62 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job));
63
64 /* Transactional group of block jobs */
65 struct BlockJobTxn {
66
67 /* Is this txn being cancelled? */
68 bool aborting;
69
70 /* List of jobs */
71 QLIST_HEAD(, BlockJob) jobs;
72
73 /* Reference count */
74 int refcnt;
75 };
76
77 /*
78 * The block job API is composed of two categories of functions.
79 *
80 * The first includes functions used by the monitor. The monitor is
81 * peculiar in that it accesses the block job list with block_job_get, and
82 * therefore needs consistency across block_job_get and the actual operation
83 * (e.g. block_job_set_speed). The consistency is achieved with
84 * aio_context_acquire/release. These functions are declared in blockjob.h.
85 *
86 * The second includes functions used by the block job drivers and sometimes
87 * by the core block layer. These do not care about locking, because the
88 * whole coroutine runs under the AioContext lock, and are declared in
89 * blockjob_int.h.
90 */
91
92 static bool is_block_job(Job *job)
93 {
94 return job_type(job) == JOB_TYPE_BACKUP ||
95 job_type(job) == JOB_TYPE_COMMIT ||
96 job_type(job) == JOB_TYPE_MIRROR ||
97 job_type(job) == JOB_TYPE_STREAM;
98 }
99
100 BlockJob *block_job_next(BlockJob *bjob)
101 {
102 Job *job = bjob ? &bjob->job : NULL;
103
104 do {
105 job = job_next(job);
106 } while (job && !is_block_job(job));
107
108 return job ? container_of(job, BlockJob, job) : NULL;
109 }
110
111 BlockJob *block_job_get(const char *id)
112 {
113 Job *job = job_get(id);
114
115 if (job && is_block_job(job)) {
116 return container_of(job, BlockJob, job);
117 } else {
118 return NULL;
119 }
120 }
121
122 BlockJobTxn *block_job_txn_new(void)
123 {
124 BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
125 QLIST_INIT(&txn->jobs);
126 txn->refcnt = 1;
127 return txn;
128 }
129
130 static void block_job_txn_ref(BlockJobTxn *txn)
131 {
132 txn->refcnt++;
133 }
134
135 void block_job_txn_unref(BlockJobTxn *txn)
136 {
137 if (txn && --txn->refcnt == 0) {
138 g_free(txn);
139 }
140 }
141
142 void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
143 {
144 if (!txn) {
145 return;
146 }
147
148 assert(!job->txn);
149 job->txn = txn;
150
151 QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
152 block_job_txn_ref(txn);
153 }
154
155 static void block_job_txn_del_job(BlockJob *job)
156 {
157 if (job->txn) {
158 QLIST_REMOVE(job, txn_list);
159 block_job_txn_unref(job->txn);
160 job->txn = NULL;
161 }
162 }
163
164 /* Assumes the block_job_mutex is held */
165 static bool block_job_timer_pending(BlockJob *job)
166 {
167 return timer_pending(&job->sleep_timer);
168 }
169
170 /* Assumes the block_job_mutex is held */
171 static bool block_job_timer_not_pending(BlockJob *job)
172 {
173 return !block_job_timer_pending(job);
174 }
175
176 static void block_job_pause(BlockJob *job)
177 {
178 job->pause_count++;
179 }
180
181 static void block_job_resume(BlockJob *job)
182 {
183 assert(job->pause_count > 0);
184 job->pause_count--;
185 if (job->pause_count) {
186 return;
187 }
188
189 /* kick only if no timer is pending */
190 block_job_enter_cond(job, block_job_timer_not_pending);
191 }
192
193 static void block_job_attached_aio_context(AioContext *new_context,
194 void *opaque);
195 static void block_job_detach_aio_context(void *opaque);
196
197 void block_job_free(Job *job)
198 {
199 BlockJob *bjob = container_of(job, BlockJob, job);
200 BlockDriverState *bs = blk_bs(bjob->blk);
201
202 assert(!bjob->txn);
203
204 bs->job = NULL;
205 block_job_remove_all_bdrv(bjob);
206 blk_remove_aio_context_notifier(bjob->blk,
207 block_job_attached_aio_context,
208 block_job_detach_aio_context, bjob);
209 blk_unref(bjob->blk);
210 error_free(bjob->blocker);
211 assert(!timer_pending(&bjob->sleep_timer));
212 }
213
214 static void block_job_attached_aio_context(AioContext *new_context,
215 void *opaque)
216 {
217 BlockJob *job = opaque;
218
219 if (job->driver->attached_aio_context) {
220 job->driver->attached_aio_context(job, new_context);
221 }
222
223 block_job_resume(job);
224 }
225
226 static void block_job_drain(BlockJob *job)
227 {
228 /* If job is !job->busy this kicks it into the next pause point. */
229 block_job_enter(job);
230
231 blk_drain(job->blk);
232 if (job->driver->drain) {
233 job->driver->drain(job);
234 }
235 }
236
237 static void block_job_detach_aio_context(void *opaque)
238 {
239 BlockJob *job = opaque;
240
241 /* In case the job terminates during aio_poll()... */
242 job_ref(&job->job);
243
244 block_job_pause(job);
245
246 while (!job->paused && !job->completed) {
247 block_job_drain(job);
248 }
249
250 job_unref(&job->job);
251 }
252
253 static char *child_job_get_parent_desc(BdrvChild *c)
254 {
255 BlockJob *job = c->opaque;
256 return g_strdup_printf("%s job '%s'", job_type_str(&job->job), job->job.id);
257 }
258
259 static void child_job_drained_begin(BdrvChild *c)
260 {
261 BlockJob *job = c->opaque;
262 block_job_pause(job);
263 }
264
265 static void child_job_drained_end(BdrvChild *c)
266 {
267 BlockJob *job = c->opaque;
268 block_job_resume(job);
269 }
270
271 static const BdrvChildRole child_job = {
272 .get_parent_desc = child_job_get_parent_desc,
273 .drained_begin = child_job_drained_begin,
274 .drained_end = child_job_drained_end,
275 .stay_at_node = true,
276 };
277
278 void block_job_remove_all_bdrv(BlockJob *job)
279 {
280 GSList *l;
281 for (l = job->nodes; l; l = l->next) {
282 BdrvChild *c = l->data;
283 bdrv_op_unblock_all(c->bs, job->blocker);
284 bdrv_root_unref_child(c);
285 }
286 g_slist_free(job->nodes);
287 job->nodes = NULL;
288 }
289
290 int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
291 uint64_t perm, uint64_t shared_perm, Error **errp)
292 {
293 BdrvChild *c;
294
295 c = bdrv_root_attach_child(bs, name, &child_job, perm, shared_perm,
296 job, errp);
297 if (c == NULL) {
298 return -EPERM;
299 }
300
301 job->nodes = g_slist_prepend(job->nodes, c);
302 bdrv_ref(bs);
303 bdrv_op_block_all(bs, job->blocker);
304
305 return 0;
306 }
307
308 bool block_job_is_internal(BlockJob *job)
309 {
310 return (job->job.id == NULL);
311 }
312
313 static bool block_job_started(BlockJob *job)
314 {
315 return job->co;
316 }
317
318 const BlockJobDriver *block_job_driver(BlockJob *job)
319 {
320 return job->driver;
321 }
322
323 /**
324 * All jobs must allow a pause point before entering their job proper. This
325 * ensures that jobs can be paused prior to being started, then resumed later.
326 */
327 static void coroutine_fn block_job_co_entry(void *opaque)
328 {
329 BlockJob *job = opaque;
330
331 assert(job && job->driver && job->driver->start);
332 block_job_pause_point(job);
333 job->driver->start(job);
334 }
335
336 static void block_job_sleep_timer_cb(void *opaque)
337 {
338 BlockJob *job = opaque;
339
340 block_job_enter(job);
341 }
342
343 void block_job_start(BlockJob *job)
344 {
345 assert(job && !block_job_started(job) && job->paused &&
346 job->driver && job->driver->start);
347 job->co = qemu_coroutine_create(block_job_co_entry, job);
348 job->pause_count--;
349 job->busy = true;
350 job->paused = false;
351 job_state_transition(&job->job, JOB_STATUS_RUNNING);
352 bdrv_coroutine_enter(blk_bs(job->blk), job->co);
353 }
354
355 static void block_job_decommission(BlockJob *job)
356 {
357 assert(job);
358 job->completed = true;
359 job->busy = false;
360 job->paused = false;
361 job->deferred_to_main_loop = true;
362 block_job_txn_del_job(job);
363 job_state_transition(&job->job, JOB_STATUS_NULL);
364 job_unref(&job->job);
365 }
366
367 static void block_job_do_dismiss(BlockJob *job)
368 {
369 block_job_decommission(job);
370 }
371
372 static void block_job_conclude(BlockJob *job)
373 {
374 job_state_transition(&job->job, JOB_STATUS_CONCLUDED);
375 if (job->auto_dismiss || !block_job_started(job)) {
376 block_job_do_dismiss(job);
377 }
378 }
379
380 static void block_job_update_rc(BlockJob *job)
381 {
382 if (!job->ret && block_job_is_cancelled(job)) {
383 job->ret = -ECANCELED;
384 }
385 if (job->ret) {
386 job_state_transition(&job->job, JOB_STATUS_ABORTING);
387 }
388 }
389
390 static int block_job_prepare(BlockJob *job)
391 {
392 if (job->ret == 0 && job->driver->prepare) {
393 job->ret = job->driver->prepare(job);
394 }
395 return job->ret;
396 }
397
398 static void block_job_commit(BlockJob *job)
399 {
400 assert(!job->ret);
401 if (job->driver->commit) {
402 job->driver->commit(job);
403 }
404 }
405
406 static void block_job_abort(BlockJob *job)
407 {
408 assert(job->ret);
409 if (job->driver->abort) {
410 job->driver->abort(job);
411 }
412 }
413
414 static void block_job_clean(BlockJob *job)
415 {
416 if (job->driver->clean) {
417 job->driver->clean(job);
418 }
419 }
420
421 static int block_job_finalize_single(BlockJob *job)
422 {
423 assert(job->completed);
424
425 /* Ensure abort is called for late-transactional failures */
426 block_job_update_rc(job);
427
428 if (!job->ret) {
429 block_job_commit(job);
430 } else {
431 block_job_abort(job);
432 }
433 block_job_clean(job);
434
435 if (job->cb) {
436 job->cb(job->opaque, job->ret);
437 }
438
439 /* Emit events only if we actually started */
440 if (block_job_started(job)) {
441 if (block_job_is_cancelled(job)) {
442 block_job_event_cancelled(job);
443 } else {
444 const char *msg = NULL;
445 if (job->ret < 0) {
446 msg = strerror(-job->ret);
447 }
448 block_job_event_completed(job, msg);
449 }
450 }
451
452 block_job_txn_del_job(job);
453 block_job_conclude(job);
454 return 0;
455 }
456
457 static void block_job_cancel_async(BlockJob *job, bool force)
458 {
459 if (job->iostatus != BLOCK_DEVICE_IO_STATUS_OK) {
460 block_job_iostatus_reset(job);
461 }
462 if (job->user_paused) {
463 /* Do not call block_job_enter here, the caller will handle it. */
464 job->user_paused = false;
465 job->pause_count--;
466 }
467 job->cancelled = true;
468 /* To prevent 'force == false' overriding a previous 'force == true' */
469 job->force |= force;
470 }
471
472 static int block_job_txn_apply(BlockJobTxn *txn, int fn(BlockJob *), bool lock)
473 {
474 AioContext *ctx;
475 BlockJob *job, *next;
476 int rc = 0;
477
478 QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
479 if (lock) {
480 ctx = blk_get_aio_context(job->blk);
481 aio_context_acquire(ctx);
482 }
483 rc = fn(job);
484 if (lock) {
485 aio_context_release(ctx);
486 }
487 if (rc) {
488 break;
489 }
490 }
491 return rc;
492 }
493
494 static int block_job_finish_sync(BlockJob *job,
495 void (*finish)(BlockJob *, Error **errp),
496 Error **errp)
497 {
498 Error *local_err = NULL;
499 int ret;
500
501 assert(blk_bs(job->blk)->job == job);
502
503 job_ref(&job->job);
504
505 if (finish) {
506 finish(job, &local_err);
507 }
508 if (local_err) {
509 error_propagate(errp, local_err);
510 job_unref(&job->job);
511 return -EBUSY;
512 }
513 /* block_job_drain calls block_job_enter, and it should be enough to
514 * induce progress until the job completes or moves to the main thread.
515 */
516 while (!job->deferred_to_main_loop && !job->completed) {
517 block_job_drain(job);
518 }
519 while (!job->completed) {
520 aio_poll(qemu_get_aio_context(), true);
521 }
522 ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
523 job_unref(&job->job);
524 return ret;
525 }
526
527 static void block_job_completed_txn_abort(BlockJob *job)
528 {
529 AioContext *ctx;
530 BlockJobTxn *txn = job->txn;
531 BlockJob *other_job;
532
533 if (txn->aborting) {
534 /*
535 * We are cancelled by another job, which will handle everything.
536 */
537 return;
538 }
539 txn->aborting = true;
540 block_job_txn_ref(txn);
541
542 /* We are the first failed job. Cancel other jobs. */
543 QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
544 ctx = blk_get_aio_context(other_job->blk);
545 aio_context_acquire(ctx);
546 }
547
548 /* Other jobs are effectively cancelled by us, set the status for
549 * them; this job, however, may or may not be cancelled, depending
550 * on the caller, so leave it. */
551 QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
552 if (other_job != job) {
553 block_job_cancel_async(other_job, false);
554 }
555 }
556 while (!QLIST_EMPTY(&txn->jobs)) {
557 other_job = QLIST_FIRST(&txn->jobs);
558 ctx = blk_get_aio_context(other_job->blk);
559 if (!other_job->completed) {
560 assert(other_job->cancelled);
561 block_job_finish_sync(other_job, NULL, NULL);
562 }
563 block_job_finalize_single(other_job);
564 aio_context_release(ctx);
565 }
566
567 block_job_txn_unref(txn);
568 }
569
570 static int block_job_needs_finalize(BlockJob *job)
571 {
572 return !job->auto_finalize;
573 }
574
575 static void block_job_do_finalize(BlockJob *job)
576 {
577 int rc;
578 assert(job && job->txn);
579
580 /* prepare the transaction to complete */
581 rc = block_job_txn_apply(job->txn, block_job_prepare, true);
582 if (rc) {
583 block_job_completed_txn_abort(job);
584 } else {
585 block_job_txn_apply(job->txn, block_job_finalize_single, true);
586 }
587 }
588
589 static void block_job_completed_txn_success(BlockJob *job)
590 {
591 BlockJobTxn *txn = job->txn;
592 BlockJob *other_job;
593
594 job_state_transition(&job->job, JOB_STATUS_WAITING);
595
596 /*
597 * Successful completion, see if there are other running jobs in this
598 * txn.
599 */
600 QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
601 if (!other_job->completed) {
602 return;
603 }
604 assert(other_job->ret == 0);
605 }
606
607 block_job_txn_apply(txn, block_job_event_pending, false);
608
609 /* If no jobs need manual finalization, automatically do so */
610 if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) {
611 block_job_do_finalize(job);
612 }
613 }
614
615 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
616 {
617 int64_t old_speed = job->speed;
618
619 if (job_apply_verb(&job->job, JOB_VERB_SET_SPEED, errp)) {
620 return;
621 }
622 if (speed < 0) {
623 error_setg(errp, QERR_INVALID_PARAMETER, "speed");
624 return;
625 }
626
627 ratelimit_set_speed(&job->limit, speed, BLOCK_JOB_SLICE_TIME);
628
629 job->speed = speed;
630 if (speed && speed <= old_speed) {
631 return;
632 }
633
634 /* kick only if a timer is pending */
635 block_job_enter_cond(job, block_job_timer_pending);
636 }
637
638 int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
639 {
640 if (!job->speed) {
641 return 0;
642 }
643
644 return ratelimit_calculate_delay(&job->limit, n);
645 }
646
647 void block_job_complete(BlockJob *job, Error **errp)
648 {
649 /* Should not be reachable via external interface for internal jobs */
650 assert(job->job.id);
651 if (job_apply_verb(&job->job, JOB_VERB_COMPLETE, errp)) {
652 return;
653 }
654 if (job->pause_count || job->cancelled || !job->driver->complete) {
655 error_setg(errp, "The active block job '%s' cannot be completed",
656 job->job.id);
657 return;
658 }
659
660 job->driver->complete(job, errp);
661 }
662
663 void block_job_finalize(BlockJob *job, Error **errp)
664 {
665 assert(job && job->job.id);
666 if (job_apply_verb(&job->job, JOB_VERB_FINALIZE, errp)) {
667 return;
668 }
669 block_job_do_finalize(job);
670 }
671
672 void block_job_dismiss(BlockJob **jobptr, Error **errp)
673 {
674 BlockJob *job = *jobptr;
675 /* similarly to _complete, this is QMP-interface only. */
676 assert(job->job.id);
677 if (job_apply_verb(&job->job, JOB_VERB_DISMISS, errp)) {
678 return;
679 }
680
681 block_job_do_dismiss(job);
682 *jobptr = NULL;
683 }
684
685 void block_job_user_pause(BlockJob *job, Error **errp)
686 {
687 if (job_apply_verb(&job->job, JOB_VERB_PAUSE, errp)) {
688 return;
689 }
690 if (job->user_paused) {
691 error_setg(errp, "Job is already paused");
692 return;
693 }
694 job->user_paused = true;
695 block_job_pause(job);
696 }
697
698 bool block_job_user_paused(BlockJob *job)
699 {
700 return job->user_paused;
701 }
702
703 void block_job_user_resume(BlockJob *job, Error **errp)
704 {
705 assert(job);
706 if (!job->user_paused || job->pause_count <= 0) {
707 error_setg(errp, "Can't resume a job that was not paused");
708 return;
709 }
710 if (job_apply_verb(&job->job, JOB_VERB_RESUME, errp)) {
711 return;
712 }
713 block_job_iostatus_reset(job);
714 job->user_paused = false;
715 block_job_resume(job);
716 }
717
718 void block_job_cancel(BlockJob *job, bool force)
719 {
720 if (job->job.status == JOB_STATUS_CONCLUDED) {
721 block_job_do_dismiss(job);
722 return;
723 }
724 block_job_cancel_async(job, force);
725 if (!block_job_started(job)) {
726 block_job_completed(job, -ECANCELED);
727 } else if (job->deferred_to_main_loop) {
728 block_job_completed_txn_abort(job);
729 } else {
730 block_job_enter(job);
731 }
732 }
733
734 void block_job_user_cancel(BlockJob *job, bool force, Error **errp)
735 {
736 if (job_apply_verb(&job->job, JOB_VERB_CANCEL, errp)) {
737 return;
738 }
739 block_job_cancel(job, force);
740 }
741
742 /* A wrapper around block_job_cancel() taking an Error ** parameter so it may be
743 * used with block_job_finish_sync() without the need for (rather nasty)
744 * function pointer casts there. */
745 static void block_job_cancel_err(BlockJob *job, Error **errp)
746 {
747 block_job_cancel(job, false);
748 }
749
750 int block_job_cancel_sync(BlockJob *job)
751 {
752 return block_job_finish_sync(job, &block_job_cancel_err, NULL);
753 }
754
755 void block_job_cancel_sync_all(void)
756 {
757 BlockJob *job;
758 AioContext *aio_context;
759
760 while ((job = block_job_next(NULL))) {
761 aio_context = blk_get_aio_context(job->blk);
762 aio_context_acquire(aio_context);
763 block_job_cancel_sync(job);
764 aio_context_release(aio_context);
765 }
766 }
767
768 int block_job_complete_sync(BlockJob *job, Error **errp)
769 {
770 return block_job_finish_sync(job, &block_job_complete, errp);
771 }
772
773 void block_job_progress_update(BlockJob *job, uint64_t done)
774 {
775 job->offset += done;
776 }
777
778 void block_job_progress_set_remaining(BlockJob *job, uint64_t remaining)
779 {
780 job->len = job->offset + remaining;
781 }
782
783 BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
784 {
785 BlockJobInfo *info;
786
787 if (block_job_is_internal(job)) {
788 error_setg(errp, "Cannot query QEMU internal jobs");
789 return NULL;
790 }
791 info = g_new0(BlockJobInfo, 1);
792 info->type = g_strdup(job_type_str(&job->job));
793 info->device = g_strdup(job->job.id);
794 info->len = job->len;
795 info->busy = atomic_read(&job->busy);
796 info->paused = job->pause_count > 0;
797 info->offset = job->offset;
798 info->speed = job->speed;
799 info->io_status = job->iostatus;
800 info->ready = job->ready;
801 info->status = job->job.status;
802 info->auto_finalize = job->auto_finalize;
803 info->auto_dismiss = job->auto_dismiss;
804 info->has_error = job->ret != 0;
805 info->error = job->ret ? g_strdup(strerror(-job->ret)) : NULL;
806 return info;
807 }
808
809 static void block_job_iostatus_set_err(BlockJob *job, int error)
810 {
811 if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
812 job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
813 BLOCK_DEVICE_IO_STATUS_FAILED;
814 }
815 }
816
817 static void block_job_event_cancelled(BlockJob *job)
818 {
819 if (block_job_is_internal(job)) {
820 return;
821 }
822
823 qapi_event_send_block_job_cancelled(job_type(&job->job),
824 job->job.id,
825 job->len,
826 job->offset,
827 job->speed,
828 &error_abort);
829 }
830
831 static void block_job_event_completed(BlockJob *job, const char *msg)
832 {
833 if (block_job_is_internal(job)) {
834 return;
835 }
836
837 qapi_event_send_block_job_completed(job_type(&job->job),
838 job->job.id,
839 job->len,
840 job->offset,
841 job->speed,
842 !!msg,
843 msg,
844 &error_abort);
845 }
846
847 static int block_job_event_pending(BlockJob *job)
848 {
849 job_state_transition(&job->job, JOB_STATUS_PENDING);
850 if (!job->auto_finalize && !block_job_is_internal(job)) {
851 qapi_event_send_block_job_pending(job_type(&job->job),
852 job->job.id,
853 &error_abort);
854 }
855 return 0;
856 }
857
858 /*
859 * API for block job drivers and the block layer. These functions are
860 * declared in blockjob_int.h.
861 */
862
863 void *block_job_create(const char *job_id, const BlockJobDriver *driver,
864 BlockJobTxn *txn, BlockDriverState *bs, uint64_t perm,
865 uint64_t shared_perm, int64_t speed, int flags,
866 BlockCompletionFunc *cb, void *opaque, Error **errp)
867 {
868 BlockBackend *blk;
869 BlockJob *job;
870 int ret;
871
872 if (bs->job) {
873 error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs));
874 return NULL;
875 }
876
877 if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) {
878 job_id = bdrv_get_device_name(bs);
879 if (!*job_id) {
880 error_setg(errp, "An explicit job ID is required for this node");
881 return NULL;
882 }
883 }
884
885 if (job_id) {
886 if (flags & BLOCK_JOB_INTERNAL) {
887 error_setg(errp, "Cannot specify job ID for internal block job");
888 return NULL;
889 }
890 }
891
892 blk = blk_new(perm, shared_perm);
893 ret = blk_insert_bs(blk, bs, errp);
894 if (ret < 0) {
895 blk_unref(blk);
896 return NULL;
897 }
898
899 job = job_create(job_id, &driver->job_driver, errp);
900 if (job == NULL) {
901 blk_unref(blk);
902 return NULL;
903 }
904
905 assert(is_block_job(&job->job));
906 assert(job->job.driver->free == &block_job_free);
907
908 job->driver = driver;
909 job->blk = blk;
910 job->cb = cb;
911 job->opaque = opaque;
912 job->busy = false;
913 job->paused = true;
914 job->pause_count = 1;
915 job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE);
916 job->auto_dismiss = !(flags & BLOCK_JOB_MANUAL_DISMISS);
917 aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
918 QEMU_CLOCK_REALTIME, SCALE_NS,
919 block_job_sleep_timer_cb, job);
920
921 error_setg(&job->blocker, "block device is in use by block job: %s",
922 job_type_str(&job->job));
923 block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort);
924 bs->job = job;
925
926 bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
927
928 blk_add_aio_context_notifier(blk, block_job_attached_aio_context,
929 block_job_detach_aio_context, job);
930
931 /* Only set speed when necessary to avoid NotSupported error */
932 if (speed != 0) {
933 Error *local_err = NULL;
934
935 block_job_set_speed(job, speed, &local_err);
936 if (local_err) {
937 block_job_early_fail(job);
938 error_propagate(errp, local_err);
939 return NULL;
940 }
941 }
942
943 /* Single jobs are modeled as single-job transactions for sake of
944 * consolidating the job management logic */
945 if (!txn) {
946 txn = block_job_txn_new();
947 block_job_txn_add_job(txn, job);
948 block_job_txn_unref(txn);
949 } else {
950 block_job_txn_add_job(txn, job);
951 }
952
953 return job;
954 }
955
956 void block_job_early_fail(BlockJob *job)
957 {
958 assert(job->job.status == JOB_STATUS_CREATED);
959 block_job_decommission(job);
960 }
961
962 void block_job_completed(BlockJob *job, int ret)
963 {
964 assert(job && job->txn && !job->completed);
965 assert(blk_bs(job->blk)->job == job);
966 job->completed = true;
967 job->ret = ret;
968 block_job_update_rc(job);
969 trace_block_job_completed(job, ret, job->ret);
970 if (job->ret) {
971 block_job_completed_txn_abort(job);
972 } else {
973 block_job_completed_txn_success(job);
974 }
975 }
976
977 static bool block_job_should_pause(BlockJob *job)
978 {
979 return job->pause_count > 0;
980 }
981
982 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
983 * Reentering the job coroutine with block_job_enter() before the timer has
984 * expired is allowed and cancels the timer.
985 *
986 * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
987 * called explicitly. */
988 static void block_job_do_yield(BlockJob *job, uint64_t ns)
989 {
990 block_job_lock();
991 if (ns != -1) {
992 timer_mod(&job->sleep_timer, ns);
993 }
994 job->busy = false;
995 block_job_unlock();
996 qemu_coroutine_yield();
997
998 /* Set by block_job_enter before re-entering the coroutine. */
999 assert(job->busy);
1000 }
1001
1002 void coroutine_fn block_job_pause_point(BlockJob *job)
1003 {
1004 assert(job && block_job_started(job));
1005
1006 if (!block_job_should_pause(job)) {
1007 return;
1008 }
1009 if (block_job_is_cancelled(job)) {
1010 return;
1011 }
1012
1013 if (job->driver->pause) {
1014 job->driver->pause(job);
1015 }
1016
1017 if (block_job_should_pause(job) && !block_job_is_cancelled(job)) {
1018 JobStatus status = job->job.status;
1019 job_state_transition(&job->job, status == JOB_STATUS_READY
1020 ? JOB_STATUS_STANDBY
1021 : JOB_STATUS_PAUSED);
1022 job->paused = true;
1023 block_job_do_yield(job, -1);
1024 job->paused = false;
1025 job_state_transition(&job->job, status);
1026 }
1027
1028 if (job->driver->resume) {
1029 job->driver->resume(job);
1030 }
1031 }
1032
1033 /*
1034 * Conditionally enter a block_job pending a call to fn() while
1035 * under the block_job_lock critical section.
1036 */
1037 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job))
1038 {
1039 if (!block_job_started(job)) {
1040 return;
1041 }
1042 if (job->deferred_to_main_loop) {
1043 return;
1044 }
1045
1046 block_job_lock();
1047 if (job->busy) {
1048 block_job_unlock();
1049 return;
1050 }
1051
1052 if (fn && !fn(job)) {
1053 block_job_unlock();
1054 return;
1055 }
1056
1057 assert(!job->deferred_to_main_loop);
1058 timer_del(&job->sleep_timer);
1059 job->busy = true;
1060 block_job_unlock();
1061 aio_co_wake(job->co);
1062 }
1063
1064 void block_job_enter(BlockJob *job)
1065 {
1066 block_job_enter_cond(job, NULL);
1067 }
1068
1069 bool block_job_is_cancelled(BlockJob *job)
1070 {
1071 return job->cancelled;
1072 }
1073
1074 void block_job_sleep_ns(BlockJob *job, int64_t ns)
1075 {
1076 assert(job->busy);
1077
1078 /* Check cancellation *before* setting busy = false, too! */
1079 if (block_job_is_cancelled(job)) {
1080 return;
1081 }
1082
1083 if (!block_job_should_pause(job)) {
1084 block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
1085 }
1086
1087 block_job_pause_point(job);
1088 }
1089
1090 void block_job_yield(BlockJob *job)
1091 {
1092 assert(job->busy);
1093
1094 /* Check cancellation *before* setting busy = false, too! */
1095 if (block_job_is_cancelled(job)) {
1096 return;
1097 }
1098
1099 if (!block_job_should_pause(job)) {
1100 block_job_do_yield(job, -1);
1101 }
1102
1103 block_job_pause_point(job);
1104 }
1105
1106 void block_job_iostatus_reset(BlockJob *job)
1107 {
1108 if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
1109 return;
1110 }
1111 assert(job->user_paused && job->pause_count > 0);
1112 job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
1113 }
1114
1115 void block_job_event_ready(BlockJob *job)
1116 {
1117 job_state_transition(&job->job, JOB_STATUS_READY);
1118 job->ready = true;
1119
1120 if (block_job_is_internal(job)) {
1121 return;
1122 }
1123
1124 qapi_event_send_block_job_ready(job_type(&job->job),
1125 job->job.id,
1126 job->len,
1127 job->offset,
1128 job->speed, &error_abort);
1129 }
1130
1131 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
1132 int is_read, int error)
1133 {
1134 BlockErrorAction action;
1135
1136 switch (on_err) {
1137 case BLOCKDEV_ON_ERROR_ENOSPC:
1138 case BLOCKDEV_ON_ERROR_AUTO:
1139 action = (error == ENOSPC) ?
1140 BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
1141 break;
1142 case BLOCKDEV_ON_ERROR_STOP:
1143 action = BLOCK_ERROR_ACTION_STOP;
1144 break;
1145 case BLOCKDEV_ON_ERROR_REPORT:
1146 action = BLOCK_ERROR_ACTION_REPORT;
1147 break;
1148 case BLOCKDEV_ON_ERROR_IGNORE:
1149 action = BLOCK_ERROR_ACTION_IGNORE;
1150 break;
1151 default:
1152 abort();
1153 }
1154 if (!block_job_is_internal(job)) {
1155 qapi_event_send_block_job_error(job->job.id,
1156 is_read ? IO_OPERATION_TYPE_READ :
1157 IO_OPERATION_TYPE_WRITE,
1158 action, &error_abort);
1159 }
1160 if (action == BLOCK_ERROR_ACTION_STOP) {
1161 block_job_pause(job);
1162 /* make the pause user visible, which will be resumed from QMP. */
1163 job->user_paused = true;
1164 block_job_iostatus_set_err(job, error);
1165 }
1166 return action;
1167 }
1168
1169 typedef struct {
1170 BlockJob *job;
1171 AioContext *aio_context;
1172 BlockJobDeferToMainLoopFn *fn;
1173 void *opaque;
1174 } BlockJobDeferToMainLoopData;
1175
1176 static void block_job_defer_to_main_loop_bh(void *opaque)
1177 {
1178 BlockJobDeferToMainLoopData *data = opaque;
1179 AioContext *aio_context;
1180
1181 /* Prevent race with block_job_defer_to_main_loop() */
1182 aio_context_acquire(data->aio_context);
1183
1184 /* Fetch BDS AioContext again, in case it has changed */
1185 aio_context = blk_get_aio_context(data->job->blk);
1186 if (aio_context != data->aio_context) {
1187 aio_context_acquire(aio_context);
1188 }
1189
1190 data->fn(data->job, data->opaque);
1191
1192 if (aio_context != data->aio_context) {
1193 aio_context_release(aio_context);
1194 }
1195
1196 aio_context_release(data->aio_context);
1197
1198 g_free(data);
1199 }
1200
1201 void block_job_defer_to_main_loop(BlockJob *job,
1202 BlockJobDeferToMainLoopFn *fn,
1203 void *opaque)
1204 {
1205 BlockJobDeferToMainLoopData *data = g_malloc(sizeof(*data));
1206 data->job = job;
1207 data->aio_context = blk_get_aio_context(job->blk);
1208 data->fn = fn;
1209 data->opaque = opaque;
1210 job->deferred_to_main_loop = true;
1211
1212 aio_bh_schedule_oneshot(qemu_get_aio_context(),
1213 block_job_defer_to_main_loop_bh, data);
1214 }