]> git.proxmox.com Git - mirror_qemu.git/blob - block/graph-lock.c
edk2: update to git snapshot
[mirror_qemu.git] / block / graph-lock.c
1 /*
2 * Graph lock: rwlock to protect block layer graph manipulations (add/remove
3 * edges and nodes)
4 *
5 * Copyright (c) 2022 Red Hat
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "qemu/osdep.h"
22 #include "qemu/main-loop.h"
23 #include "block/graph-lock.h"
24 #include "block/block.h"
25 #include "block/block_int.h"
26
27 /* Dummy lock object to use for Thread Safety Analysis (TSA) */
28 BdrvGraphLock graph_lock;
29
30 /* Protects the list of aiocontext and orphaned_reader_count */
31 static QemuMutex aio_context_list_lock;
32
33 /* Written and read with atomic operations. */
34 static int has_writer;
35
36 /*
37 * A reader coroutine could move from an AioContext to another.
38 * If this happens, there is no problem from the point of view of
39 * counters. The problem is that the total count becomes
40 * unbalanced if one of the two AioContexts gets deleted.
41 * The count of readers must remain correct, so the AioContext's
42 * balance is transferred to this glboal variable.
43 * Protected by aio_context_list_lock.
44 */
45 static uint32_t orphaned_reader_count;
46
47 /* Queue of readers waiting for the writer to finish */
48 static CoQueue reader_queue;
49
50 struct BdrvGraphRWlock {
51 /* How many readers are currently reading the graph. */
52 uint32_t reader_count;
53
54 /*
55 * List of BdrvGraphRWlock kept in graph-lock.c
56 * Protected by aio_context_list_lock
57 */
58 QTAILQ_ENTRY(BdrvGraphRWlock) next_aio;
59 };
60
61 /*
62 * List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock
63 * can safely modify only its own counter, avoid reading/writing
64 * others and thus improving performances by avoiding cacheline bounces.
65 */
66 static QTAILQ_HEAD(, BdrvGraphRWlock) aio_context_list =
67 QTAILQ_HEAD_INITIALIZER(aio_context_list);
68
69 static void __attribute__((__constructor__)) bdrv_init_graph_lock(void)
70 {
71 qemu_mutex_init(&aio_context_list_lock);
72 qemu_co_queue_init(&reader_queue);
73 }
74
75 void register_aiocontext(AioContext *ctx)
76 {
77 ctx->bdrv_graph = g_new0(BdrvGraphRWlock, 1);
78 QEMU_LOCK_GUARD(&aio_context_list_lock);
79 assert(ctx->bdrv_graph->reader_count == 0);
80 QTAILQ_INSERT_TAIL(&aio_context_list, ctx->bdrv_graph, next_aio);
81 }
82
83 void unregister_aiocontext(AioContext *ctx)
84 {
85 QEMU_LOCK_GUARD(&aio_context_list_lock);
86 orphaned_reader_count += ctx->bdrv_graph->reader_count;
87 QTAILQ_REMOVE(&aio_context_list, ctx->bdrv_graph, next_aio);
88 g_free(ctx->bdrv_graph);
89 }
90
91 static uint32_t reader_count(void)
92 {
93 BdrvGraphRWlock *brdv_graph;
94 uint32_t rd;
95
96 QEMU_LOCK_GUARD(&aio_context_list_lock);
97
98 /* rd can temporarily be negative, but the total will *always* be >= 0 */
99 rd = orphaned_reader_count;
100 QTAILQ_FOREACH(brdv_graph, &aio_context_list, next_aio) {
101 rd += qatomic_read(&brdv_graph->reader_count);
102 }
103
104 /* shouldn't overflow unless there are 2^31 readers */
105 assert((int32_t)rd >= 0);
106 return rd;
107 }
108
109 void no_coroutine_fn bdrv_graph_wrlock(BlockDriverState *bs)
110 {
111 AioContext *ctx = NULL;
112
113 GLOBAL_STATE_CODE();
114 assert(!qatomic_read(&has_writer));
115 assert(!qemu_in_coroutine());
116
117 /*
118 * Release only non-mainloop AioContext. The mainloop often relies on the
119 * BQL and doesn't lock the main AioContext before doing things.
120 */
121 if (bs) {
122 ctx = bdrv_get_aio_context(bs);
123 if (ctx != qemu_get_aio_context()) {
124 aio_context_release(ctx);
125 } else {
126 ctx = NULL;
127 }
128 }
129
130 /* Make sure that constantly arriving new I/O doesn't cause starvation */
131 bdrv_drain_all_begin_nopoll();
132
133 /*
134 * reader_count == 0: this means writer will read has_reader as 1
135 * reader_count >= 1: we don't know if writer read has_writer == 0 or 1,
136 * but we need to wait.
137 * Wait by allowing other coroutine (and possible readers) to continue.
138 */
139 do {
140 /*
141 * has_writer must be 0 while polling, otherwise we get a deadlock if
142 * any callback involved during AIO_WAIT_WHILE() tries to acquire the
143 * reader lock.
144 */
145 qatomic_set(&has_writer, 0);
146 AIO_WAIT_WHILE_UNLOCKED(NULL, reader_count() >= 1);
147 qatomic_set(&has_writer, 1);
148
149 /*
150 * We want to only check reader_count() after has_writer = 1 is visible
151 * to other threads. That way no more readers can sneak in after we've
152 * determined reader_count() == 0.
153 */
154 smp_mb();
155 } while (reader_count() >= 1);
156
157 bdrv_drain_all_end();
158
159 if (ctx) {
160 aio_context_acquire(bdrv_get_aio_context(bs));
161 }
162 }
163
164 void no_coroutine_fn bdrv_graph_wrunlock_ctx(AioContext *ctx)
165 {
166 GLOBAL_STATE_CODE();
167 assert(qatomic_read(&has_writer));
168
169 /*
170 * Release only non-mainloop AioContext. The mainloop often relies on the
171 * BQL and doesn't lock the main AioContext before doing things.
172 */
173 if (ctx && ctx != qemu_get_aio_context()) {
174 aio_context_release(ctx);
175 } else {
176 ctx = NULL;
177 }
178
179 WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) {
180 /*
181 * No need for memory barriers, this works in pair with
182 * the slow path of rdlock() and both take the lock.
183 */
184 qatomic_store_release(&has_writer, 0);
185
186 /* Wake up all coroutines that are waiting to read the graph */
187 qemu_co_enter_all(&reader_queue, &aio_context_list_lock);
188 }
189
190 /*
191 * Run any BHs that were scheduled during the wrlock section and that
192 * callers might expect to have finished (in particular, this is important
193 * for bdrv_schedule_unref()).
194 *
195 * Do this only after restarting coroutines so that nested event loops in
196 * BHs don't deadlock if their condition relies on the coroutine making
197 * progress.
198 */
199 aio_bh_poll(qemu_get_aio_context());
200
201 if (ctx) {
202 aio_context_acquire(ctx);
203 }
204 }
205
206 void no_coroutine_fn bdrv_graph_wrunlock(BlockDriverState *bs)
207 {
208 AioContext *ctx = bs ? bdrv_get_aio_context(bs) : NULL;
209
210 bdrv_graph_wrunlock_ctx(ctx);
211 }
212
213 void coroutine_fn bdrv_graph_co_rdlock(void)
214 {
215 BdrvGraphRWlock *bdrv_graph;
216 bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
217
218 for (;;) {
219 qatomic_set(&bdrv_graph->reader_count,
220 bdrv_graph->reader_count + 1);
221 /* make sure writer sees reader_count before we check has_writer */
222 smp_mb();
223
224 /*
225 * has_writer == 0: this means writer will read reader_count as >= 1
226 * has_writer == 1: we don't know if writer read reader_count == 0
227 * or > 0, but we need to wait anyways because
228 * it will write.
229 */
230 if (!qatomic_read(&has_writer)) {
231 break;
232 }
233
234 /*
235 * Synchronize access with reader_count() in bdrv_graph_wrlock().
236 * Case 1:
237 * If this critical section gets executed first, reader_count will
238 * decrease and the reader will go to sleep.
239 * Then the writer will read reader_count that does not take into
240 * account this reader, and if there's no other reader it will
241 * enter the write section.
242 * Case 2:
243 * If reader_count() critical section gets executed first,
244 * then writer will read reader_count >= 1.
245 * It will wait in AIO_WAIT_WHILE(), but once it releases the lock
246 * we will enter this critical section and call aio_wait_kick().
247 */
248 WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) {
249 /*
250 * Additional check when we use the above lock to synchronize
251 * with bdrv_graph_wrunlock().
252 * Case 1:
253 * If this gets executed first, has_writer is still 1, so we reduce
254 * reader_count and go to sleep.
255 * Then the writer will set has_writer to 0 and wake up all readers,
256 * us included.
257 * Case 2:
258 * If bdrv_graph_wrunlock() critical section gets executed first,
259 * then it will set has_writer to 0 and wake up all other readers.
260 * Then we execute this critical section, and therefore must check
261 * again for has_writer, otherwise we sleep without any writer
262 * actually running.
263 */
264 if (!qatomic_read(&has_writer)) {
265 return;
266 }
267
268 /* slow path where reader sleeps */
269 bdrv_graph->reader_count--;
270 aio_wait_kick();
271 qemu_co_queue_wait(&reader_queue, &aio_context_list_lock);
272 }
273 }
274 }
275
276 void coroutine_fn bdrv_graph_co_rdunlock(void)
277 {
278 BdrvGraphRWlock *bdrv_graph;
279 bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
280
281 qatomic_store_release(&bdrv_graph->reader_count,
282 bdrv_graph->reader_count - 1);
283 /* make sure writer sees reader_count before we check has_writer */
284 smp_mb();
285
286 /*
287 * has_writer == 0: this means reader will read reader_count decreased
288 * has_writer == 1: we don't know if writer read reader_count old or
289 * new. Therefore, kick again so on next iteration
290 * writer will for sure read the updated value.
291 */
292 if (qatomic_read(&has_writer)) {
293 aio_wait_kick();
294 }
295 }
296
297 void bdrv_graph_rdlock_main_loop(void)
298 {
299 GLOBAL_STATE_CODE();
300 assert(!qemu_in_coroutine());
301 }
302
303 void bdrv_graph_rdunlock_main_loop(void)
304 {
305 GLOBAL_STATE_CODE();
306 assert(!qemu_in_coroutine());
307 }
308
309 void assert_bdrv_graph_readable(void)
310 {
311 /* reader_count() is slow due to aio_context_list_lock lock contention */
312 #ifdef CONFIG_DEBUG_GRAPH_LOCK
313 assert(qemu_in_main_thread() || reader_count());
314 #endif
315 }
316
317 void assert_bdrv_graph_writable(void)
318 {
319 assert(qemu_in_main_thread());
320 assert(qatomic_read(&has_writer));
321 }