2 * Graph lock: rwlock to protect block layer graph manipulations (add/remove
5 * Copyright (c) 2022 Red Hat
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
21 #include "qemu/osdep.h"
22 #include "qemu/main-loop.h"
23 #include "block/graph-lock.h"
24 #include "block/block.h"
25 #include "block/block_int.h"
27 /* Protects the list of aiocontext and orphaned_reader_count */
28 static QemuMutex aio_context_list_lock
;
30 /* Written and read with atomic operations. */
31 static int has_writer
;
34 * A reader coroutine could move from an AioContext to another.
35 * If this happens, there is no problem from the point of view of
36 * counters. The problem is that the total count becomes
37 * unbalanced if one of the two AioContexts gets deleted.
38 * The count of readers must remain correct, so the AioContext's
39 * balance is transferred to this glboal variable.
40 * Protected by aio_context_list_lock.
42 static uint32_t orphaned_reader_count
;
44 /* Queue of readers waiting for the writer to finish */
45 static CoQueue reader_queue
;
47 struct BdrvGraphRWlock
{
48 /* How many readers are currently reading the graph. */
49 uint32_t reader_count
;
52 * List of BdrvGraphRWlock kept in graph-lock.c
53 * Protected by aio_context_list_lock
55 QTAILQ_ENTRY(BdrvGraphRWlock
) next_aio
;
59 * List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock
60 * can safely modify only its own counter, avoid reading/writing
61 * others and thus improving performances by avoiding cacheline bounces.
63 static QTAILQ_HEAD(, BdrvGraphRWlock
) aio_context_list
=
64 QTAILQ_HEAD_INITIALIZER(aio_context_list
);
66 static void __attribute__((__constructor__
)) bdrv_init_graph_lock(void)
68 qemu_mutex_init(&aio_context_list_lock
);
69 qemu_co_queue_init(&reader_queue
);
72 void register_aiocontext(AioContext
*ctx
)
74 ctx
->bdrv_graph
= g_new0(BdrvGraphRWlock
, 1);
75 QEMU_LOCK_GUARD(&aio_context_list_lock
);
76 assert(ctx
->bdrv_graph
->reader_count
== 0);
77 QTAILQ_INSERT_TAIL(&aio_context_list
, ctx
->bdrv_graph
, next_aio
);
80 void unregister_aiocontext(AioContext
*ctx
)
82 QEMU_LOCK_GUARD(&aio_context_list_lock
);
83 orphaned_reader_count
+= ctx
->bdrv_graph
->reader_count
;
84 QTAILQ_REMOVE(&aio_context_list
, ctx
->bdrv_graph
, next_aio
);
85 g_free(ctx
->bdrv_graph
);
88 static uint32_t reader_count(void)
90 BdrvGraphRWlock
*brdv_graph
;
93 QEMU_LOCK_GUARD(&aio_context_list_lock
);
95 /* rd can temporarly be negative, but the total will *always* be >= 0 */
96 rd
= orphaned_reader_count
;
97 QTAILQ_FOREACH(brdv_graph
, &aio_context_list
, next_aio
) {
98 rd
+= qatomic_read(&brdv_graph
->reader_count
);
101 /* shouldn't overflow unless there are 2^31 readers */
102 assert((int32_t)rd
>= 0);
106 void bdrv_graph_wrlock(void)
109 assert(!qatomic_read(&has_writer
));
111 /* Make sure that constantly arriving new I/O doesn't cause starvation */
112 bdrv_drain_all_begin_nopoll();
115 * reader_count == 0: this means writer will read has_reader as 1
116 * reader_count >= 1: we don't know if writer read has_writer == 0 or 1,
117 * but we need to wait.
118 * Wait by allowing other coroutine (and possible readers) to continue.
122 * has_writer must be 0 while polling, otherwise we get a deadlock if
123 * any callback involved during AIO_WAIT_WHILE() tries to acquire the
126 qatomic_set(&has_writer
, 0);
127 AIO_WAIT_WHILE(qemu_get_aio_context(), reader_count() >= 1);
128 qatomic_set(&has_writer
, 1);
131 * We want to only check reader_count() after has_writer = 1 is visible
132 * to other threads. That way no more readers can sneak in after we've
133 * determined reader_count() == 0.
136 } while (reader_count() >= 1);
138 bdrv_drain_all_end();
141 void bdrv_graph_wrunlock(void)
144 QEMU_LOCK_GUARD(&aio_context_list_lock
);
145 assert(qatomic_read(&has_writer
));
148 * No need for memory barriers, this works in pair with
149 * the slow path of rdlock() and both take the lock.
151 qatomic_store_release(&has_writer
, 0);
153 /* Wake up all coroutine that are waiting to read the graph */
154 qemu_co_enter_all(&reader_queue
, &aio_context_list_lock
);
157 void coroutine_fn
bdrv_graph_co_rdlock(void)
159 BdrvGraphRWlock
*bdrv_graph
;
160 bdrv_graph
= qemu_get_current_aio_context()->bdrv_graph
;
162 /* Do not lock if in main thread */
163 if (qemu_in_main_thread()) {
168 qatomic_set(&bdrv_graph
->reader_count
,
169 bdrv_graph
->reader_count
+ 1);
170 /* make sure writer sees reader_count before we check has_writer */
174 * has_writer == 0: this means writer will read reader_count as >= 1
175 * has_writer == 1: we don't know if writer read reader_count == 0
176 * or > 0, but we need to wait anyways because
179 if (!qatomic_read(&has_writer
)) {
184 * Synchronize access with reader_count() in bdrv_graph_wrlock().
186 * If this critical section gets executed first, reader_count will
187 * decrease and the reader will go to sleep.
188 * Then the writer will read reader_count that does not take into
189 * account this reader, and if there's no other reader it will
190 * enter the write section.
192 * If reader_count() critical section gets executed first,
193 * then writer will read reader_count >= 1.
194 * It will wait in AIO_WAIT_WHILE(), but once it releases the lock
195 * we will enter this critical section and call aio_wait_kick().
197 WITH_QEMU_LOCK_GUARD(&aio_context_list_lock
) {
199 * Additional check when we use the above lock to synchronize
200 * with bdrv_graph_wrunlock().
202 * If this gets executed first, has_writer is still 1, so we reduce
203 * reader_count and go to sleep.
204 * Then the writer will set has_writer to 0 and wake up all readers,
207 * If bdrv_graph_wrunlock() critical section gets executed first,
208 * then it will set has_writer to 0 and wake up all other readers.
209 * Then we execute this critical section, and therefore must check
210 * again for has_writer, otherwise we sleep without any writer
213 if (!qatomic_read(&has_writer
)) {
217 /* slow path where reader sleeps */
218 bdrv_graph
->reader_count
--;
220 qemu_co_queue_wait(&reader_queue
, &aio_context_list_lock
);
225 void coroutine_fn
bdrv_graph_co_rdunlock(void)
227 BdrvGraphRWlock
*bdrv_graph
;
228 bdrv_graph
= qemu_get_current_aio_context()->bdrv_graph
;
230 /* Do not lock if in main thread */
231 if (qemu_in_main_thread()) {
235 qatomic_store_release(&bdrv_graph
->reader_count
,
236 bdrv_graph
->reader_count
- 1);
237 /* make sure writer sees reader_count before we check has_writer */
241 * has_writer == 0: this means reader will read reader_count decreased
242 * has_writer == 1: we don't know if writer read reader_count old or
243 * new. Therefore, kick again so on next iteration
244 * writer will for sure read the updated value.
246 if (qatomic_read(&has_writer
)) {
251 void bdrv_graph_rdlock_main_loop(void)
254 assert(!qemu_in_coroutine());
257 void bdrv_graph_rdunlock_main_loop(void)
260 assert(!qemu_in_coroutine());