/* * Graph lock: rwlock to protect block layer graph manipulations (add/remove * edges and nodes) * * Copyright (c) 2022 Red Hat * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, see . */ #include "qemu/osdep.h" #include "qemu/main-loop.h" #include "block/graph-lock.h" #include "block/block.h" #include "block/block_int.h" /* Dummy lock object to use for Thread Safety Analysis (TSA) */ BdrvGraphLock graph_lock; /* Protects the list of aiocontext and orphaned_reader_count */ static QemuMutex aio_context_list_lock; /* Written and read with atomic operations. */ static int has_writer; /* * A reader coroutine could move from an AioContext to another. * If this happens, there is no problem from the point of view of * counters. The problem is that the total count becomes * unbalanced if one of the two AioContexts gets deleted. * The count of readers must remain correct, so the AioContext's * balance is transferred to this glboal variable. * Protected by aio_context_list_lock. */ static uint32_t orphaned_reader_count; /* Queue of readers waiting for the writer to finish */ static CoQueue reader_queue; struct BdrvGraphRWlock { /* How many readers are currently reading the graph. */ uint32_t reader_count; /* * List of BdrvGraphRWlock kept in graph-lock.c * Protected by aio_context_list_lock */ QTAILQ_ENTRY(BdrvGraphRWlock) next_aio; }; /* * List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock * can safely modify only its own counter, avoid reading/writing * others and thus improving performances by avoiding cacheline bounces. */ static QTAILQ_HEAD(, BdrvGraphRWlock) aio_context_list = QTAILQ_HEAD_INITIALIZER(aio_context_list); static void __attribute__((__constructor__)) bdrv_init_graph_lock(void) { qemu_mutex_init(&aio_context_list_lock); qemu_co_queue_init(&reader_queue); } void register_aiocontext(AioContext *ctx) { ctx->bdrv_graph = g_new0(BdrvGraphRWlock, 1); QEMU_LOCK_GUARD(&aio_context_list_lock); assert(ctx->bdrv_graph->reader_count == 0); QTAILQ_INSERT_TAIL(&aio_context_list, ctx->bdrv_graph, next_aio); } void unregister_aiocontext(AioContext *ctx) { QEMU_LOCK_GUARD(&aio_context_list_lock); orphaned_reader_count += ctx->bdrv_graph->reader_count; QTAILQ_REMOVE(&aio_context_list, ctx->bdrv_graph, next_aio); g_free(ctx->bdrv_graph); } static uint32_t reader_count(void) { BdrvGraphRWlock *brdv_graph; uint32_t rd; QEMU_LOCK_GUARD(&aio_context_list_lock); /* rd can temporarily be negative, but the total will *always* be >= 0 */ rd = orphaned_reader_count; QTAILQ_FOREACH(brdv_graph, &aio_context_list, next_aio) { rd += qatomic_read(&brdv_graph->reader_count); } /* shouldn't overflow unless there are 2^31 readers */ assert((int32_t)rd >= 0); return rd; } void no_coroutine_fn bdrv_graph_wrlock(BlockDriverState *bs) { AioContext *ctx = NULL; GLOBAL_STATE_CODE(); assert(!qatomic_read(&has_writer)); assert(!qemu_in_coroutine()); /* * Release only non-mainloop AioContext. The mainloop often relies on the * BQL and doesn't lock the main AioContext before doing things. */ if (bs) { ctx = bdrv_get_aio_context(bs); if (ctx != qemu_get_aio_context()) { aio_context_release(ctx); } else { ctx = NULL; } } /* Make sure that constantly arriving new I/O doesn't cause starvation */ bdrv_drain_all_begin_nopoll(); /* * reader_count == 0: this means writer will read has_reader as 1 * reader_count >= 1: we don't know if writer read has_writer == 0 or 1, * but we need to wait. * Wait by allowing other coroutine (and possible readers) to continue. */ do { /* * has_writer must be 0 while polling, otherwise we get a deadlock if * any callback involved during AIO_WAIT_WHILE() tries to acquire the * reader lock. */ qatomic_set(&has_writer, 0); AIO_WAIT_WHILE_UNLOCKED(NULL, reader_count() >= 1); qatomic_set(&has_writer, 1); /* * We want to only check reader_count() after has_writer = 1 is visible * to other threads. That way no more readers can sneak in after we've * determined reader_count() == 0. */ smp_mb(); } while (reader_count() >= 1); bdrv_drain_all_end(); if (ctx) { aio_context_acquire(bdrv_get_aio_context(bs)); } } void no_coroutine_fn bdrv_graph_wrunlock_ctx(AioContext *ctx) { GLOBAL_STATE_CODE(); assert(qatomic_read(&has_writer)); /* * Release only non-mainloop AioContext. The mainloop often relies on the * BQL and doesn't lock the main AioContext before doing things. */ if (ctx && ctx != qemu_get_aio_context()) { aio_context_release(ctx); } else { ctx = NULL; } WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) { /* * No need for memory barriers, this works in pair with * the slow path of rdlock() and both take the lock. */ qatomic_store_release(&has_writer, 0); /* Wake up all coroutines that are waiting to read the graph */ qemu_co_enter_all(&reader_queue, &aio_context_list_lock); } /* * Run any BHs that were scheduled during the wrlock section and that * callers might expect to have finished (in particular, this is important * for bdrv_schedule_unref()). * * Do this only after restarting coroutines so that nested event loops in * BHs don't deadlock if their condition relies on the coroutine making * progress. */ aio_bh_poll(qemu_get_aio_context()); if (ctx) { aio_context_acquire(ctx); } } void no_coroutine_fn bdrv_graph_wrunlock(BlockDriverState *bs) { AioContext *ctx = bs ? bdrv_get_aio_context(bs) : NULL; bdrv_graph_wrunlock_ctx(ctx); } void coroutine_fn bdrv_graph_co_rdlock(void) { BdrvGraphRWlock *bdrv_graph; bdrv_graph = qemu_get_current_aio_context()->bdrv_graph; for (;;) { qatomic_set(&bdrv_graph->reader_count, bdrv_graph->reader_count + 1); /* make sure writer sees reader_count before we check has_writer */ smp_mb(); /* * has_writer == 0: this means writer will read reader_count as >= 1 * has_writer == 1: we don't know if writer read reader_count == 0 * or > 0, but we need to wait anyways because * it will write. */ if (!qatomic_read(&has_writer)) { break; } /* * Synchronize access with reader_count() in bdrv_graph_wrlock(). * Case 1: * If this critical section gets executed first, reader_count will * decrease and the reader will go to sleep. * Then the writer will read reader_count that does not take into * account this reader, and if there's no other reader it will * enter the write section. * Case 2: * If reader_count() critical section gets executed first, * then writer will read reader_count >= 1. * It will wait in AIO_WAIT_WHILE(), but once it releases the lock * we will enter this critical section and call aio_wait_kick(). */ WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) { /* * Additional check when we use the above lock to synchronize * with bdrv_graph_wrunlock(). * Case 1: * If this gets executed first, has_writer is still 1, so we reduce * reader_count and go to sleep. * Then the writer will set has_writer to 0 and wake up all readers, * us included. * Case 2: * If bdrv_graph_wrunlock() critical section gets executed first, * then it will set has_writer to 0 and wake up all other readers. * Then we execute this critical section, and therefore must check * again for has_writer, otherwise we sleep without any writer * actually running. */ if (!qatomic_read(&has_writer)) { return; } /* slow path where reader sleeps */ bdrv_graph->reader_count--; aio_wait_kick(); qemu_co_queue_wait(&reader_queue, &aio_context_list_lock); } } } void coroutine_fn bdrv_graph_co_rdunlock(void) { BdrvGraphRWlock *bdrv_graph; bdrv_graph = qemu_get_current_aio_context()->bdrv_graph; qatomic_store_release(&bdrv_graph->reader_count, bdrv_graph->reader_count - 1); /* make sure writer sees reader_count before we check has_writer */ smp_mb(); /* * has_writer == 0: this means reader will read reader_count decreased * has_writer == 1: we don't know if writer read reader_count old or * new. Therefore, kick again so on next iteration * writer will for sure read the updated value. */ if (qatomic_read(&has_writer)) { aio_wait_kick(); } } void bdrv_graph_rdlock_main_loop(void) { GLOBAL_STATE_CODE(); assert(!qemu_in_coroutine()); } void bdrv_graph_rdunlock_main_loop(void) { GLOBAL_STATE_CODE(); assert(!qemu_in_coroutine()); } void assert_bdrv_graph_readable(void) { /* reader_count() is slow due to aio_context_list_lock lock contention */ #ifdef CONFIG_DEBUG_GRAPH_LOCK assert(qemu_in_main_thread() || reader_count()); #endif } void assert_bdrv_graph_writable(void) { assert(qemu_in_main_thread()); assert(qatomic_read(&has_writer)); }