xref: /qemu/block/graph-lock.c (revision 7c1f51bf)
1 /*
2  * Graph lock: rwlock to protect block layer graph manipulations (add/remove
3  * edges and nodes)
4  *
5  *  Copyright (c) 2022 Red Hat
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 #include "qemu/osdep.h"
22 #include "qemu/main-loop.h"
23 #include "block/graph-lock.h"
24 #include "block/block.h"
25 #include "block/block_int.h"
26 
27 /* Dummy lock object to use for Thread Safety Analysis (TSA) */
28 BdrvGraphLock graph_lock;
29 
30 /* Protects the list of aiocontext and orphaned_reader_count */
31 static QemuMutex aio_context_list_lock;
32 
33 #if 0
34 /* Written and read with atomic operations. */
35 static int has_writer;
36 #endif
37 
38 /*
39  * A reader coroutine could move from an AioContext to another.
40  * If this happens, there is no problem from the point of view of
41  * counters. The problem is that the total count becomes
42  * unbalanced if one of the two AioContexts gets deleted.
43  * The count of readers must remain correct, so the AioContext's
44  * balance is transferred to this glboal variable.
45  * Protected by aio_context_list_lock.
46  */
47 static uint32_t orphaned_reader_count;
48 
49 /* Queue of readers waiting for the writer to finish */
50 static CoQueue reader_queue;
51 
52 struct BdrvGraphRWlock {
53     /* How many readers are currently reading the graph. */
54     uint32_t reader_count;
55 
56     /*
57      * List of BdrvGraphRWlock kept in graph-lock.c
58      * Protected by aio_context_list_lock
59      */
60     QTAILQ_ENTRY(BdrvGraphRWlock) next_aio;
61 };
62 
63 /*
64  * List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock
65  * can safely modify only its own counter, avoid reading/writing
66  * others and thus improving performances by avoiding cacheline bounces.
67  */
68 static QTAILQ_HEAD(, BdrvGraphRWlock) aio_context_list =
69     QTAILQ_HEAD_INITIALIZER(aio_context_list);
70 
71 static void __attribute__((__constructor__)) bdrv_init_graph_lock(void)
72 {
73     qemu_mutex_init(&aio_context_list_lock);
74     qemu_co_queue_init(&reader_queue);
75 }
76 
77 void register_aiocontext(AioContext *ctx)
78 {
79     ctx->bdrv_graph = g_new0(BdrvGraphRWlock, 1);
80     QEMU_LOCK_GUARD(&aio_context_list_lock);
81     assert(ctx->bdrv_graph->reader_count == 0);
82     QTAILQ_INSERT_TAIL(&aio_context_list, ctx->bdrv_graph, next_aio);
83 }
84 
85 void unregister_aiocontext(AioContext *ctx)
86 {
87     QEMU_LOCK_GUARD(&aio_context_list_lock);
88     orphaned_reader_count += ctx->bdrv_graph->reader_count;
89     QTAILQ_REMOVE(&aio_context_list, ctx->bdrv_graph, next_aio);
90     g_free(ctx->bdrv_graph);
91 }
92 
93 #if 0
94 static uint32_t reader_count(void)
95 {
96     BdrvGraphRWlock *brdv_graph;
97     uint32_t rd;
98 
99     QEMU_LOCK_GUARD(&aio_context_list_lock);
100 
101     /* rd can temporarly be negative, but the total will *always* be >= 0 */
102     rd = orphaned_reader_count;
103     QTAILQ_FOREACH(brdv_graph, &aio_context_list, next_aio) {
104         rd += qatomic_read(&brdv_graph->reader_count);
105     }
106 
107     /* shouldn't overflow unless there are 2^31 readers */
108     assert((int32_t)rd >= 0);
109     return rd;
110 }
111 #endif
112 
113 void bdrv_graph_wrlock(void)
114 {
115     GLOBAL_STATE_CODE();
116     /*
117      * TODO Some callers hold an AioContext lock when this is called, which
118      * causes deadlocks. Reenable once the AioContext locking is cleaned up (or
119      * AioContext locks are gone).
120      */
121 #if 0
122     assert(!qatomic_read(&has_writer));
123 
124     /* Make sure that constantly arriving new I/O doesn't cause starvation */
125     bdrv_drain_all_begin_nopoll();
126 
127     /*
128      * reader_count == 0: this means writer will read has_reader as 1
129      * reader_count >= 1: we don't know if writer read has_writer == 0 or 1,
130      *                    but we need to wait.
131      * Wait by allowing other coroutine (and possible readers) to continue.
132      */
133     do {
134         /*
135          * has_writer must be 0 while polling, otherwise we get a deadlock if
136          * any callback involved during AIO_WAIT_WHILE() tries to acquire the
137          * reader lock.
138          */
139         qatomic_set(&has_writer, 0);
140         AIO_WAIT_WHILE_UNLOCKED(NULL, reader_count() >= 1);
141         qatomic_set(&has_writer, 1);
142 
143         /*
144          * We want to only check reader_count() after has_writer = 1 is visible
145          * to other threads. That way no more readers can sneak in after we've
146          * determined reader_count() == 0.
147          */
148         smp_mb();
149     } while (reader_count() >= 1);
150 
151     bdrv_drain_all_end();
152 #endif
153 }
154 
155 void bdrv_graph_wrunlock(void)
156 {
157     GLOBAL_STATE_CODE();
158 #if 0
159     QEMU_LOCK_GUARD(&aio_context_list_lock);
160     assert(qatomic_read(&has_writer));
161 
162     /*
163      * No need for memory barriers, this works in pair with
164      * the slow path of rdlock() and both take the lock.
165      */
166     qatomic_store_release(&has_writer, 0);
167 
168     /* Wake up all coroutine that are waiting to read the graph */
169     qemu_co_enter_all(&reader_queue, &aio_context_list_lock);
170 #endif
171 }
172 
173 void coroutine_fn bdrv_graph_co_rdlock(void)
174 {
175     /* TODO Reenable when wrlock is reenabled */
176 #if 0
177     BdrvGraphRWlock *bdrv_graph;
178     bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
179 
180     for (;;) {
181         qatomic_set(&bdrv_graph->reader_count,
182                     bdrv_graph->reader_count + 1);
183         /* make sure writer sees reader_count before we check has_writer */
184         smp_mb();
185 
186         /*
187          * has_writer == 0: this means writer will read reader_count as >= 1
188          * has_writer == 1: we don't know if writer read reader_count == 0
189          *                  or > 0, but we need to wait anyways because
190          *                  it will write.
191          */
192         if (!qatomic_read(&has_writer)) {
193             break;
194         }
195 
196         /*
197          * Synchronize access with reader_count() in bdrv_graph_wrlock().
198          * Case 1:
199          * If this critical section gets executed first, reader_count will
200          * decrease and the reader will go to sleep.
201          * Then the writer will read reader_count that does not take into
202          * account this reader, and if there's no other reader it will
203          * enter the write section.
204          * Case 2:
205          * If reader_count() critical section gets executed first,
206          * then writer will read reader_count >= 1.
207          * It will wait in AIO_WAIT_WHILE(), but once it releases the lock
208          * we will enter this critical section and call aio_wait_kick().
209          */
210         WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) {
211             /*
212              * Additional check when we use the above lock to synchronize
213              * with bdrv_graph_wrunlock().
214              * Case 1:
215              * If this gets executed first, has_writer is still 1, so we reduce
216              * reader_count and go to sleep.
217              * Then the writer will set has_writer to 0 and wake up all readers,
218              * us included.
219              * Case 2:
220              * If bdrv_graph_wrunlock() critical section gets executed first,
221              * then it will set has_writer to 0 and wake up all other readers.
222              * Then we execute this critical section, and therefore must check
223              * again for has_writer, otherwise we sleep without any writer
224              * actually running.
225              */
226             if (!qatomic_read(&has_writer)) {
227                 return;
228             }
229 
230             /* slow path where reader sleeps */
231             bdrv_graph->reader_count--;
232             aio_wait_kick();
233             qemu_co_queue_wait(&reader_queue, &aio_context_list_lock);
234         }
235     }
236 #endif
237 }
238 
239 void coroutine_fn bdrv_graph_co_rdunlock(void)
240 {
241 #if 0
242     BdrvGraphRWlock *bdrv_graph;
243     bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
244 
245     qatomic_store_release(&bdrv_graph->reader_count,
246                           bdrv_graph->reader_count - 1);
247     /* make sure writer sees reader_count before we check has_writer */
248     smp_mb();
249 
250     /*
251      * has_writer == 0: this means reader will read reader_count decreased
252      * has_writer == 1: we don't know if writer read reader_count old or
253      *                  new. Therefore, kick again so on next iteration
254      *                  writer will for sure read the updated value.
255      */
256     if (qatomic_read(&has_writer)) {
257         aio_wait_kick();
258     }
259 #endif
260 }
261 
262 void bdrv_graph_rdlock_main_loop(void)
263 {
264     GLOBAL_STATE_CODE();
265     assert(!qemu_in_coroutine());
266 }
267 
268 void bdrv_graph_rdunlock_main_loop(void)
269 {
270     GLOBAL_STATE_CODE();
271     assert(!qemu_in_coroutine());
272 }
273 
274 void assert_bdrv_graph_readable(void)
275 {
276     /* reader_count() is slow due to aio_context_list_lock lock contention */
277     /* TODO Reenable when wrlock is reenabled */
278 #if 0
279 #ifdef CONFIG_DEBUG_GRAPH_LOCK
280     assert(qemu_in_main_thread() || reader_count());
281 #endif
282 #endif
283 }
284 
285 void assert_bdrv_graph_writable(void)
286 {
287     assert(qemu_in_main_thread());
288     /* TODO Reenable when wrlock is reenabled */
289 #if 0
290     assert(qatomic_read(&has_writer));
291 #endif
292 }
293