]> git.proxmox.com Git - mirror_qemu.git/blob - blockjob.c
job: Add Job.aio_context
[mirror_qemu.git] / blockjob.c
1 /*
2 * QEMU System Emulator block driver
3 *
4 * Copyright (c) 2011 IBM Corp.
5 * Copyright (c) 2012 Red Hat, Inc.
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 * THE SOFTWARE.
24 */
25
26 #include "qemu/osdep.h"
27 #include "qemu-common.h"
28 #include "block/block.h"
29 #include "block/blockjob_int.h"
30 #include "block/block_int.h"
31 #include "block/trace.h"
32 #include "sysemu/block-backend.h"
33 #include "qapi/error.h"
34 #include "qapi/qapi-events-block-core.h"
35 #include "qapi/qmp/qerror.h"
36 #include "qemu/coroutine.h"
37 #include "qemu/timer.h"
38
39 /* Right now, this mutex is only needed to synchronize accesses to job->busy
40 * and job->sleep_timer, such as concurrent calls to block_job_do_yield and
41 * block_job_enter. */
42 static QemuMutex block_job_mutex;
43
44 static void block_job_lock(void)
45 {
46 qemu_mutex_lock(&block_job_mutex);
47 }
48
49 static void block_job_unlock(void)
50 {
51 qemu_mutex_unlock(&block_job_mutex);
52 }
53
54 static void __attribute__((__constructor__)) block_job_init(void)
55 {
56 qemu_mutex_init(&block_job_mutex);
57 }
58
59 static void block_job_event_cancelled(BlockJob *job);
60 static void block_job_event_completed(BlockJob *job, const char *msg);
61 static int block_job_event_pending(BlockJob *job);
62 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job));
63
64 /* Transactional group of block jobs */
65 struct BlockJobTxn {
66
67 /* Is this txn being cancelled? */
68 bool aborting;
69
70 /* List of jobs */
71 QLIST_HEAD(, BlockJob) jobs;
72
73 /* Reference count */
74 int refcnt;
75 };
76
77 /*
78 * The block job API is composed of two categories of functions.
79 *
80 * The first includes functions used by the monitor. The monitor is
81 * peculiar in that it accesses the block job list with block_job_get, and
82 * therefore needs consistency across block_job_get and the actual operation
83 * (e.g. block_job_set_speed). The consistency is achieved with
84 * aio_context_acquire/release. These functions are declared in blockjob.h.
85 *
86 * The second includes functions used by the block job drivers and sometimes
87 * by the core block layer. These do not care about locking, because the
88 * whole coroutine runs under the AioContext lock, and are declared in
89 * blockjob_int.h.
90 */
91
92 static bool is_block_job(Job *job)
93 {
94 return job_type(job) == JOB_TYPE_BACKUP ||
95 job_type(job) == JOB_TYPE_COMMIT ||
96 job_type(job) == JOB_TYPE_MIRROR ||
97 job_type(job) == JOB_TYPE_STREAM;
98 }
99
100 BlockJob *block_job_next(BlockJob *bjob)
101 {
102 Job *job = bjob ? &bjob->job : NULL;
103
104 do {
105 job = job_next(job);
106 } while (job && !is_block_job(job));
107
108 return job ? container_of(job, BlockJob, job) : NULL;
109 }
110
111 BlockJob *block_job_get(const char *id)
112 {
113 Job *job = job_get(id);
114
115 if (job && is_block_job(job)) {
116 return container_of(job, BlockJob, job);
117 } else {
118 return NULL;
119 }
120 }
121
122 BlockJobTxn *block_job_txn_new(void)
123 {
124 BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
125 QLIST_INIT(&txn->jobs);
126 txn->refcnt = 1;
127 return txn;
128 }
129
130 static void block_job_txn_ref(BlockJobTxn *txn)
131 {
132 txn->refcnt++;
133 }
134
135 void block_job_txn_unref(BlockJobTxn *txn)
136 {
137 if (txn && --txn->refcnt == 0) {
138 g_free(txn);
139 }
140 }
141
142 void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
143 {
144 if (!txn) {
145 return;
146 }
147
148 assert(!job->txn);
149 job->txn = txn;
150
151 QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
152 block_job_txn_ref(txn);
153 }
154
155 static void block_job_txn_del_job(BlockJob *job)
156 {
157 if (job->txn) {
158 QLIST_REMOVE(job, txn_list);
159 block_job_txn_unref(job->txn);
160 job->txn = NULL;
161 }
162 }
163
164 /* Assumes the block_job_mutex is held */
165 static bool block_job_timer_pending(BlockJob *job)
166 {
167 return timer_pending(&job->sleep_timer);
168 }
169
170 /* Assumes the block_job_mutex is held */
171 static bool block_job_timer_not_pending(BlockJob *job)
172 {
173 return !block_job_timer_pending(job);
174 }
175
176 static void block_job_pause(BlockJob *job)
177 {
178 job->pause_count++;
179 }
180
181 static void block_job_resume(BlockJob *job)
182 {
183 assert(job->pause_count > 0);
184 job->pause_count--;
185 if (job->pause_count) {
186 return;
187 }
188
189 /* kick only if no timer is pending */
190 block_job_enter_cond(job, block_job_timer_not_pending);
191 }
192
193 static void block_job_attached_aio_context(AioContext *new_context,
194 void *opaque);
195 static void block_job_detach_aio_context(void *opaque);
196
197 void block_job_free(Job *job)
198 {
199 BlockJob *bjob = container_of(job, BlockJob, job);
200 BlockDriverState *bs = blk_bs(bjob->blk);
201
202 assert(!bjob->txn);
203
204 bs->job = NULL;
205 block_job_remove_all_bdrv(bjob);
206 blk_remove_aio_context_notifier(bjob->blk,
207 block_job_attached_aio_context,
208 block_job_detach_aio_context, bjob);
209 blk_unref(bjob->blk);
210 error_free(bjob->blocker);
211 assert(!timer_pending(&bjob->sleep_timer));
212 }
213
214 static void block_job_attached_aio_context(AioContext *new_context,
215 void *opaque)
216 {
217 BlockJob *job = opaque;
218
219 job->job.aio_context = new_context;
220 if (job->driver->attached_aio_context) {
221 job->driver->attached_aio_context(job, new_context);
222 }
223
224 block_job_resume(job);
225 }
226
227 static void block_job_drain(BlockJob *job)
228 {
229 /* If job is !job->busy this kicks it into the next pause point. */
230 block_job_enter(job);
231
232 blk_drain(job->blk);
233 if (job->driver->drain) {
234 job->driver->drain(job);
235 }
236 }
237
238 static void block_job_detach_aio_context(void *opaque)
239 {
240 BlockJob *job = opaque;
241
242 /* In case the job terminates during aio_poll()... */
243 job_ref(&job->job);
244
245 block_job_pause(job);
246
247 while (!job->paused && !job->completed) {
248 block_job_drain(job);
249 }
250
251 job->job.aio_context = NULL;
252 job_unref(&job->job);
253 }
254
255 static char *child_job_get_parent_desc(BdrvChild *c)
256 {
257 BlockJob *job = c->opaque;
258 return g_strdup_printf("%s job '%s'", job_type_str(&job->job), job->job.id);
259 }
260
261 static void child_job_drained_begin(BdrvChild *c)
262 {
263 BlockJob *job = c->opaque;
264 block_job_pause(job);
265 }
266
267 static void child_job_drained_end(BdrvChild *c)
268 {
269 BlockJob *job = c->opaque;
270 block_job_resume(job);
271 }
272
273 static const BdrvChildRole child_job = {
274 .get_parent_desc = child_job_get_parent_desc,
275 .drained_begin = child_job_drained_begin,
276 .drained_end = child_job_drained_end,
277 .stay_at_node = true,
278 };
279
280 void block_job_remove_all_bdrv(BlockJob *job)
281 {
282 GSList *l;
283 for (l = job->nodes; l; l = l->next) {
284 BdrvChild *c = l->data;
285 bdrv_op_unblock_all(c->bs, job->blocker);
286 bdrv_root_unref_child(c);
287 }
288 g_slist_free(job->nodes);
289 job->nodes = NULL;
290 }
291
292 int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
293 uint64_t perm, uint64_t shared_perm, Error **errp)
294 {
295 BdrvChild *c;
296
297 c = bdrv_root_attach_child(bs, name, &child_job, perm, shared_perm,
298 job, errp);
299 if (c == NULL) {
300 return -EPERM;
301 }
302
303 job->nodes = g_slist_prepend(job->nodes, c);
304 bdrv_ref(bs);
305 bdrv_op_block_all(bs, job->blocker);
306
307 return 0;
308 }
309
310 bool block_job_is_internal(BlockJob *job)
311 {
312 return (job->job.id == NULL);
313 }
314
315 static bool block_job_started(BlockJob *job)
316 {
317 return job->co;
318 }
319
320 const BlockJobDriver *block_job_driver(BlockJob *job)
321 {
322 return job->driver;
323 }
324
325 /**
326 * All jobs must allow a pause point before entering their job proper. This
327 * ensures that jobs can be paused prior to being started, then resumed later.
328 */
329 static void coroutine_fn block_job_co_entry(void *opaque)
330 {
331 BlockJob *job = opaque;
332
333 assert(job && job->driver && job->driver->start);
334 block_job_pause_point(job);
335 job->driver->start(job);
336 }
337
338 static void block_job_sleep_timer_cb(void *opaque)
339 {
340 BlockJob *job = opaque;
341
342 block_job_enter(job);
343 }
344
345 void block_job_start(BlockJob *job)
346 {
347 assert(job && !block_job_started(job) && job->paused &&
348 job->driver && job->driver->start);
349 job->co = qemu_coroutine_create(block_job_co_entry, job);
350 job->pause_count--;
351 job->busy = true;
352 job->paused = false;
353 job_state_transition(&job->job, JOB_STATUS_RUNNING);
354 bdrv_coroutine_enter(blk_bs(job->blk), job->co);
355 }
356
357 static void block_job_decommission(BlockJob *job)
358 {
359 assert(job);
360 job->completed = true;
361 job->busy = false;
362 job->paused = false;
363 job->deferred_to_main_loop = true;
364 block_job_txn_del_job(job);
365 job_state_transition(&job->job, JOB_STATUS_NULL);
366 job_unref(&job->job);
367 }
368
369 static void block_job_do_dismiss(BlockJob *job)
370 {
371 block_job_decommission(job);
372 }
373
374 static void block_job_conclude(BlockJob *job)
375 {
376 job_state_transition(&job->job, JOB_STATUS_CONCLUDED);
377 if (job->auto_dismiss || !block_job_started(job)) {
378 block_job_do_dismiss(job);
379 }
380 }
381
382 static void block_job_update_rc(BlockJob *job)
383 {
384 if (!job->ret && job_is_cancelled(&job->job)) {
385 job->ret = -ECANCELED;
386 }
387 if (job->ret) {
388 job_state_transition(&job->job, JOB_STATUS_ABORTING);
389 }
390 }
391
392 static int block_job_prepare(BlockJob *job)
393 {
394 if (job->ret == 0 && job->driver->prepare) {
395 job->ret = job->driver->prepare(job);
396 }
397 return job->ret;
398 }
399
400 static void block_job_commit(BlockJob *job)
401 {
402 assert(!job->ret);
403 if (job->driver->commit) {
404 job->driver->commit(job);
405 }
406 }
407
408 static void block_job_abort(BlockJob *job)
409 {
410 assert(job->ret);
411 if (job->driver->abort) {
412 job->driver->abort(job);
413 }
414 }
415
416 static void block_job_clean(BlockJob *job)
417 {
418 if (job->driver->clean) {
419 job->driver->clean(job);
420 }
421 }
422
423 static int block_job_finalize_single(BlockJob *job)
424 {
425 assert(job->completed);
426
427 /* Ensure abort is called for late-transactional failures */
428 block_job_update_rc(job);
429
430 if (!job->ret) {
431 block_job_commit(job);
432 } else {
433 block_job_abort(job);
434 }
435 block_job_clean(job);
436
437 if (job->cb) {
438 job->cb(job->opaque, job->ret);
439 }
440
441 /* Emit events only if we actually started */
442 if (block_job_started(job)) {
443 if (job_is_cancelled(&job->job)) {
444 block_job_event_cancelled(job);
445 } else {
446 const char *msg = NULL;
447 if (job->ret < 0) {
448 msg = strerror(-job->ret);
449 }
450 block_job_event_completed(job, msg);
451 }
452 }
453
454 block_job_txn_del_job(job);
455 block_job_conclude(job);
456 return 0;
457 }
458
459 static void block_job_cancel_async(BlockJob *job, bool force)
460 {
461 if (job->iostatus != BLOCK_DEVICE_IO_STATUS_OK) {
462 block_job_iostatus_reset(job);
463 }
464 if (job->user_paused) {
465 /* Do not call block_job_enter here, the caller will handle it. */
466 job->user_paused = false;
467 job->pause_count--;
468 }
469 job->job.cancelled = true;
470 /* To prevent 'force == false' overriding a previous 'force == true' */
471 job->force |= force;
472 }
473
474 static int block_job_txn_apply(BlockJobTxn *txn, int fn(BlockJob *), bool lock)
475 {
476 AioContext *ctx;
477 BlockJob *job, *next;
478 int rc = 0;
479
480 QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
481 if (lock) {
482 ctx = blk_get_aio_context(job->blk);
483 aio_context_acquire(ctx);
484 }
485 rc = fn(job);
486 if (lock) {
487 aio_context_release(ctx);
488 }
489 if (rc) {
490 break;
491 }
492 }
493 return rc;
494 }
495
496 static int block_job_finish_sync(BlockJob *job,
497 void (*finish)(BlockJob *, Error **errp),
498 Error **errp)
499 {
500 Error *local_err = NULL;
501 int ret;
502
503 assert(blk_bs(job->blk)->job == job);
504
505 job_ref(&job->job);
506
507 if (finish) {
508 finish(job, &local_err);
509 }
510 if (local_err) {
511 error_propagate(errp, local_err);
512 job_unref(&job->job);
513 return -EBUSY;
514 }
515 /* block_job_drain calls block_job_enter, and it should be enough to
516 * induce progress until the job completes or moves to the main thread.
517 */
518 while (!job->deferred_to_main_loop && !job->completed) {
519 block_job_drain(job);
520 }
521 while (!job->completed) {
522 aio_poll(qemu_get_aio_context(), true);
523 }
524 ret = (job_is_cancelled(&job->job) && job->ret == 0)
525 ? -ECANCELED : job->ret;
526 job_unref(&job->job);
527 return ret;
528 }
529
530 static void block_job_completed_txn_abort(BlockJob *job)
531 {
532 AioContext *ctx;
533 BlockJobTxn *txn = job->txn;
534 BlockJob *other_job;
535
536 if (txn->aborting) {
537 /*
538 * We are cancelled by another job, which will handle everything.
539 */
540 return;
541 }
542 txn->aborting = true;
543 block_job_txn_ref(txn);
544
545 /* We are the first failed job. Cancel other jobs. */
546 QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
547 ctx = blk_get_aio_context(other_job->blk);
548 aio_context_acquire(ctx);
549 }
550
551 /* Other jobs are effectively cancelled by us, set the status for
552 * them; this job, however, may or may not be cancelled, depending
553 * on the caller, so leave it. */
554 QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
555 if (other_job != job) {
556 block_job_cancel_async(other_job, false);
557 }
558 }
559 while (!QLIST_EMPTY(&txn->jobs)) {
560 other_job = QLIST_FIRST(&txn->jobs);
561 ctx = blk_get_aio_context(other_job->blk);
562 if (!other_job->completed) {
563 assert(job_is_cancelled(&other_job->job));
564 block_job_finish_sync(other_job, NULL, NULL);
565 }
566 block_job_finalize_single(other_job);
567 aio_context_release(ctx);
568 }
569
570 block_job_txn_unref(txn);
571 }
572
573 static int block_job_needs_finalize(BlockJob *job)
574 {
575 return !job->auto_finalize;
576 }
577
578 static void block_job_do_finalize(BlockJob *job)
579 {
580 int rc;
581 assert(job && job->txn);
582
583 /* prepare the transaction to complete */
584 rc = block_job_txn_apply(job->txn, block_job_prepare, true);
585 if (rc) {
586 block_job_completed_txn_abort(job);
587 } else {
588 block_job_txn_apply(job->txn, block_job_finalize_single, true);
589 }
590 }
591
592 static void block_job_completed_txn_success(BlockJob *job)
593 {
594 BlockJobTxn *txn = job->txn;
595 BlockJob *other_job;
596
597 job_state_transition(&job->job, JOB_STATUS_WAITING);
598
599 /*
600 * Successful completion, see if there are other running jobs in this
601 * txn.
602 */
603 QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
604 if (!other_job->completed) {
605 return;
606 }
607 assert(other_job->ret == 0);
608 }
609
610 block_job_txn_apply(txn, block_job_event_pending, false);
611
612 /* If no jobs need manual finalization, automatically do so */
613 if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) {
614 block_job_do_finalize(job);
615 }
616 }
617
618 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
619 {
620 int64_t old_speed = job->speed;
621
622 if (job_apply_verb(&job->job, JOB_VERB_SET_SPEED, errp)) {
623 return;
624 }
625 if (speed < 0) {
626 error_setg(errp, QERR_INVALID_PARAMETER, "speed");
627 return;
628 }
629
630 ratelimit_set_speed(&job->limit, speed, BLOCK_JOB_SLICE_TIME);
631
632 job->speed = speed;
633 if (speed && speed <= old_speed) {
634 return;
635 }
636
637 /* kick only if a timer is pending */
638 block_job_enter_cond(job, block_job_timer_pending);
639 }
640
641 int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
642 {
643 if (!job->speed) {
644 return 0;
645 }
646
647 return ratelimit_calculate_delay(&job->limit, n);
648 }
649
650 void block_job_complete(BlockJob *job, Error **errp)
651 {
652 /* Should not be reachable via external interface for internal jobs */
653 assert(job->job.id);
654 if (job_apply_verb(&job->job, JOB_VERB_COMPLETE, errp)) {
655 return;
656 }
657 if (job->pause_count || job_is_cancelled(&job->job) ||
658 !job->driver->complete)
659 {
660 error_setg(errp, "The active block job '%s' cannot be completed",
661 job->job.id);
662 return;
663 }
664
665 job->driver->complete(job, errp);
666 }
667
668 void block_job_finalize(BlockJob *job, Error **errp)
669 {
670 assert(job && job->job.id);
671 if (job_apply_verb(&job->job, JOB_VERB_FINALIZE, errp)) {
672 return;
673 }
674 block_job_do_finalize(job);
675 }
676
677 void block_job_dismiss(BlockJob **jobptr, Error **errp)
678 {
679 BlockJob *job = *jobptr;
680 /* similarly to _complete, this is QMP-interface only. */
681 assert(job->job.id);
682 if (job_apply_verb(&job->job, JOB_VERB_DISMISS, errp)) {
683 return;
684 }
685
686 block_job_do_dismiss(job);
687 *jobptr = NULL;
688 }
689
690 void block_job_user_pause(BlockJob *job, Error **errp)
691 {
692 if (job_apply_verb(&job->job, JOB_VERB_PAUSE, errp)) {
693 return;
694 }
695 if (job->user_paused) {
696 error_setg(errp, "Job is already paused");
697 return;
698 }
699 job->user_paused = true;
700 block_job_pause(job);
701 }
702
703 bool block_job_user_paused(BlockJob *job)
704 {
705 return job->user_paused;
706 }
707
708 void block_job_user_resume(BlockJob *job, Error **errp)
709 {
710 assert(job);
711 if (!job->user_paused || job->pause_count <= 0) {
712 error_setg(errp, "Can't resume a job that was not paused");
713 return;
714 }
715 if (job_apply_verb(&job->job, JOB_VERB_RESUME, errp)) {
716 return;
717 }
718 block_job_iostatus_reset(job);
719 job->user_paused = false;
720 block_job_resume(job);
721 }
722
723 void block_job_cancel(BlockJob *job, bool force)
724 {
725 if (job->job.status == JOB_STATUS_CONCLUDED) {
726 block_job_do_dismiss(job);
727 return;
728 }
729 block_job_cancel_async(job, force);
730 if (!block_job_started(job)) {
731 block_job_completed(job, -ECANCELED);
732 } else if (job->deferred_to_main_loop) {
733 block_job_completed_txn_abort(job);
734 } else {
735 block_job_enter(job);
736 }
737 }
738
739 void block_job_user_cancel(BlockJob *job, bool force, Error **errp)
740 {
741 if (job_apply_verb(&job->job, JOB_VERB_CANCEL, errp)) {
742 return;
743 }
744 block_job_cancel(job, force);
745 }
746
747 /* A wrapper around block_job_cancel() taking an Error ** parameter so it may be
748 * used with block_job_finish_sync() without the need for (rather nasty)
749 * function pointer casts there. */
750 static void block_job_cancel_err(BlockJob *job, Error **errp)
751 {
752 block_job_cancel(job, false);
753 }
754
755 int block_job_cancel_sync(BlockJob *job)
756 {
757 return block_job_finish_sync(job, &block_job_cancel_err, NULL);
758 }
759
760 void block_job_cancel_sync_all(void)
761 {
762 BlockJob *job;
763 AioContext *aio_context;
764
765 while ((job = block_job_next(NULL))) {
766 aio_context = blk_get_aio_context(job->blk);
767 aio_context_acquire(aio_context);
768 block_job_cancel_sync(job);
769 aio_context_release(aio_context);
770 }
771 }
772
773 int block_job_complete_sync(BlockJob *job, Error **errp)
774 {
775 return block_job_finish_sync(job, &block_job_complete, errp);
776 }
777
778 void block_job_progress_update(BlockJob *job, uint64_t done)
779 {
780 job->offset += done;
781 }
782
783 void block_job_progress_set_remaining(BlockJob *job, uint64_t remaining)
784 {
785 job->len = job->offset + remaining;
786 }
787
788 BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
789 {
790 BlockJobInfo *info;
791
792 if (block_job_is_internal(job)) {
793 error_setg(errp, "Cannot query QEMU internal jobs");
794 return NULL;
795 }
796 info = g_new0(BlockJobInfo, 1);
797 info->type = g_strdup(job_type_str(&job->job));
798 info->device = g_strdup(job->job.id);
799 info->len = job->len;
800 info->busy = atomic_read(&job->busy);
801 info->paused = job->pause_count > 0;
802 info->offset = job->offset;
803 info->speed = job->speed;
804 info->io_status = job->iostatus;
805 info->ready = job->ready;
806 info->status = job->job.status;
807 info->auto_finalize = job->auto_finalize;
808 info->auto_dismiss = job->auto_dismiss;
809 info->has_error = job->ret != 0;
810 info->error = job->ret ? g_strdup(strerror(-job->ret)) : NULL;
811 return info;
812 }
813
814 static void block_job_iostatus_set_err(BlockJob *job, int error)
815 {
816 if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
817 job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
818 BLOCK_DEVICE_IO_STATUS_FAILED;
819 }
820 }
821
822 static void block_job_event_cancelled(BlockJob *job)
823 {
824 if (block_job_is_internal(job)) {
825 return;
826 }
827
828 qapi_event_send_block_job_cancelled(job_type(&job->job),
829 job->job.id,
830 job->len,
831 job->offset,
832 job->speed,
833 &error_abort);
834 }
835
836 static void block_job_event_completed(BlockJob *job, const char *msg)
837 {
838 if (block_job_is_internal(job)) {
839 return;
840 }
841
842 qapi_event_send_block_job_completed(job_type(&job->job),
843 job->job.id,
844 job->len,
845 job->offset,
846 job->speed,
847 !!msg,
848 msg,
849 &error_abort);
850 }
851
852 static int block_job_event_pending(BlockJob *job)
853 {
854 job_state_transition(&job->job, JOB_STATUS_PENDING);
855 if (!job->auto_finalize && !block_job_is_internal(job)) {
856 qapi_event_send_block_job_pending(job_type(&job->job),
857 job->job.id,
858 &error_abort);
859 }
860 return 0;
861 }
862
863 /*
864 * API for block job drivers and the block layer. These functions are
865 * declared in blockjob_int.h.
866 */
867
868 void *block_job_create(const char *job_id, const BlockJobDriver *driver,
869 BlockJobTxn *txn, BlockDriverState *bs, uint64_t perm,
870 uint64_t shared_perm, int64_t speed, int flags,
871 BlockCompletionFunc *cb, void *opaque, Error **errp)
872 {
873 BlockBackend *blk;
874 BlockJob *job;
875 int ret;
876
877 if (bs->job) {
878 error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs));
879 return NULL;
880 }
881
882 if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) {
883 job_id = bdrv_get_device_name(bs);
884 if (!*job_id) {
885 error_setg(errp, "An explicit job ID is required for this node");
886 return NULL;
887 }
888 }
889
890 if (job_id) {
891 if (flags & BLOCK_JOB_INTERNAL) {
892 error_setg(errp, "Cannot specify job ID for internal block job");
893 return NULL;
894 }
895 }
896
897 blk = blk_new(perm, shared_perm);
898 ret = blk_insert_bs(blk, bs, errp);
899 if (ret < 0) {
900 blk_unref(blk);
901 return NULL;
902 }
903
904 job = job_create(job_id, &driver->job_driver, blk_get_aio_context(blk),
905 errp);
906 if (job == NULL) {
907 blk_unref(blk);
908 return NULL;
909 }
910
911 assert(is_block_job(&job->job));
912 assert(job->job.driver->free == &block_job_free);
913
914 job->driver = driver;
915 job->blk = blk;
916 job->cb = cb;
917 job->opaque = opaque;
918 job->busy = false;
919 job->paused = true;
920 job->pause_count = 1;
921 job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE);
922 job->auto_dismiss = !(flags & BLOCK_JOB_MANUAL_DISMISS);
923 aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
924 QEMU_CLOCK_REALTIME, SCALE_NS,
925 block_job_sleep_timer_cb, job);
926
927 error_setg(&job->blocker, "block device is in use by block job: %s",
928 job_type_str(&job->job));
929 block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort);
930 bs->job = job;
931
932 bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
933
934 blk_add_aio_context_notifier(blk, block_job_attached_aio_context,
935 block_job_detach_aio_context, job);
936
937 /* Only set speed when necessary to avoid NotSupported error */
938 if (speed != 0) {
939 Error *local_err = NULL;
940
941 block_job_set_speed(job, speed, &local_err);
942 if (local_err) {
943 block_job_early_fail(job);
944 error_propagate(errp, local_err);
945 return NULL;
946 }
947 }
948
949 /* Single jobs are modeled as single-job transactions for sake of
950 * consolidating the job management logic */
951 if (!txn) {
952 txn = block_job_txn_new();
953 block_job_txn_add_job(txn, job);
954 block_job_txn_unref(txn);
955 } else {
956 block_job_txn_add_job(txn, job);
957 }
958
959 return job;
960 }
961
962 void block_job_early_fail(BlockJob *job)
963 {
964 assert(job->job.status == JOB_STATUS_CREATED);
965 block_job_decommission(job);
966 }
967
968 void block_job_completed(BlockJob *job, int ret)
969 {
970 assert(job && job->txn && !job->completed);
971 assert(blk_bs(job->blk)->job == job);
972 job->completed = true;
973 job->ret = ret;
974 block_job_update_rc(job);
975 trace_block_job_completed(job, ret, job->ret);
976 if (job->ret) {
977 block_job_completed_txn_abort(job);
978 } else {
979 block_job_completed_txn_success(job);
980 }
981 }
982
983 static bool block_job_should_pause(BlockJob *job)
984 {
985 return job->pause_count > 0;
986 }
987
988 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
989 * Reentering the job coroutine with block_job_enter() before the timer has
990 * expired is allowed and cancels the timer.
991 *
992 * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
993 * called explicitly. */
994 static void block_job_do_yield(BlockJob *job, uint64_t ns)
995 {
996 block_job_lock();
997 if (ns != -1) {
998 timer_mod(&job->sleep_timer, ns);
999 }
1000 job->busy = false;
1001 block_job_unlock();
1002 qemu_coroutine_yield();
1003
1004 /* Set by block_job_enter before re-entering the coroutine. */
1005 assert(job->busy);
1006 }
1007
1008 void coroutine_fn block_job_pause_point(BlockJob *job)
1009 {
1010 assert(job && block_job_started(job));
1011
1012 if (!block_job_should_pause(job)) {
1013 return;
1014 }
1015 if (job_is_cancelled(&job->job)) {
1016 return;
1017 }
1018
1019 if (job->driver->pause) {
1020 job->driver->pause(job);
1021 }
1022
1023 if (block_job_should_pause(job) && !job_is_cancelled(&job->job)) {
1024 JobStatus status = job->job.status;
1025 job_state_transition(&job->job, status == JOB_STATUS_READY
1026 ? JOB_STATUS_STANDBY
1027 : JOB_STATUS_PAUSED);
1028 job->paused = true;
1029 block_job_do_yield(job, -1);
1030 job->paused = false;
1031 job_state_transition(&job->job, status);
1032 }
1033
1034 if (job->driver->resume) {
1035 job->driver->resume(job);
1036 }
1037 }
1038
1039 /*
1040 * Conditionally enter a block_job pending a call to fn() while
1041 * under the block_job_lock critical section.
1042 */
1043 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job))
1044 {
1045 if (!block_job_started(job)) {
1046 return;
1047 }
1048 if (job->deferred_to_main_loop) {
1049 return;
1050 }
1051
1052 block_job_lock();
1053 if (job->busy) {
1054 block_job_unlock();
1055 return;
1056 }
1057
1058 if (fn && !fn(job)) {
1059 block_job_unlock();
1060 return;
1061 }
1062
1063 assert(!job->deferred_to_main_loop);
1064 timer_del(&job->sleep_timer);
1065 job->busy = true;
1066 block_job_unlock();
1067 aio_co_wake(job->co);
1068 }
1069
1070 void block_job_enter(BlockJob *job)
1071 {
1072 block_job_enter_cond(job, NULL);
1073 }
1074
1075 void block_job_sleep_ns(BlockJob *job, int64_t ns)
1076 {
1077 assert(job->busy);
1078
1079 /* Check cancellation *before* setting busy = false, too! */
1080 if (job_is_cancelled(&job->job)) {
1081 return;
1082 }
1083
1084 if (!block_job_should_pause(job)) {
1085 block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
1086 }
1087
1088 block_job_pause_point(job);
1089 }
1090
1091 void block_job_yield(BlockJob *job)
1092 {
1093 assert(job->busy);
1094
1095 /* Check cancellation *before* setting busy = false, too! */
1096 if (job_is_cancelled(&job->job)) {
1097 return;
1098 }
1099
1100 if (!block_job_should_pause(job)) {
1101 block_job_do_yield(job, -1);
1102 }
1103
1104 block_job_pause_point(job);
1105 }
1106
1107 void block_job_iostatus_reset(BlockJob *job)
1108 {
1109 if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
1110 return;
1111 }
1112 assert(job->user_paused && job->pause_count > 0);
1113 job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
1114 }
1115
1116 void block_job_event_ready(BlockJob *job)
1117 {
1118 job_state_transition(&job->job, JOB_STATUS_READY);
1119 job->ready = true;
1120
1121 if (block_job_is_internal(job)) {
1122 return;
1123 }
1124
1125 qapi_event_send_block_job_ready(job_type(&job->job),
1126 job->job.id,
1127 job->len,
1128 job->offset,
1129 job->speed, &error_abort);
1130 }
1131
1132 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
1133 int is_read, int error)
1134 {
1135 BlockErrorAction action;
1136
1137 switch (on_err) {
1138 case BLOCKDEV_ON_ERROR_ENOSPC:
1139 case BLOCKDEV_ON_ERROR_AUTO:
1140 action = (error == ENOSPC) ?
1141 BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
1142 break;
1143 case BLOCKDEV_ON_ERROR_STOP:
1144 action = BLOCK_ERROR_ACTION_STOP;
1145 break;
1146 case BLOCKDEV_ON_ERROR_REPORT:
1147 action = BLOCK_ERROR_ACTION_REPORT;
1148 break;
1149 case BLOCKDEV_ON_ERROR_IGNORE:
1150 action = BLOCK_ERROR_ACTION_IGNORE;
1151 break;
1152 default:
1153 abort();
1154 }
1155 if (!block_job_is_internal(job)) {
1156 qapi_event_send_block_job_error(job->job.id,
1157 is_read ? IO_OPERATION_TYPE_READ :
1158 IO_OPERATION_TYPE_WRITE,
1159 action, &error_abort);
1160 }
1161 if (action == BLOCK_ERROR_ACTION_STOP) {
1162 block_job_pause(job);
1163 /* make the pause user visible, which will be resumed from QMP. */
1164 job->user_paused = true;
1165 block_job_iostatus_set_err(job, error);
1166 }
1167 return action;
1168 }
1169
1170 typedef struct {
1171 BlockJob *job;
1172 AioContext *aio_context;
1173 BlockJobDeferToMainLoopFn *fn;
1174 void *opaque;
1175 } BlockJobDeferToMainLoopData;
1176
1177 static void block_job_defer_to_main_loop_bh(void *opaque)
1178 {
1179 BlockJobDeferToMainLoopData *data = opaque;
1180 AioContext *aio_context;
1181
1182 /* Prevent race with block_job_defer_to_main_loop() */
1183 aio_context_acquire(data->aio_context);
1184
1185 /* Fetch BDS AioContext again, in case it has changed */
1186 aio_context = blk_get_aio_context(data->job->blk);
1187 if (aio_context != data->aio_context) {
1188 aio_context_acquire(aio_context);
1189 }
1190
1191 data->fn(data->job, data->opaque);
1192
1193 if (aio_context != data->aio_context) {
1194 aio_context_release(aio_context);
1195 }
1196
1197 aio_context_release(data->aio_context);
1198
1199 g_free(data);
1200 }
1201
1202 void block_job_defer_to_main_loop(BlockJob *job,
1203 BlockJobDeferToMainLoopFn *fn,
1204 void *opaque)
1205 {
1206 BlockJobDeferToMainLoopData *data = g_malloc(sizeof(*data));
1207 data->job = job;
1208 data->aio_context = blk_get_aio_context(job->blk);
1209 data->fn = fn;
1210 data->opaque = opaque;
1211 job->deferred_to_main_loop = true;
1212
1213 aio_bh_schedule_oneshot(qemu_get_aio_context(),
1214 block_job_defer_to_main_loop_bh, data);
1215 }