xref: /qemu/block/throttle-groups.c (revision d046c51d)
1 /*
2  * QEMU block throttling group infrastructure
3  *
4  * Copyright (C) Nodalink, EURL. 2014
5  * Copyright (C) Igalia, S.L. 2015
6  *
7  * Authors:
8  *   Benoît Canet <benoit.canet@nodalink.com>
9  *   Alberto Garcia <berto@igalia.com>
10  *
11  * This program is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU General Public License as
13  * published by the Free Software Foundation; either version 2 or
14  * (at your option) version 3 of the License.
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with this program; if not, see <http://www.gnu.org/licenses/>.
23  */
24 
25 #include "qemu/osdep.h"
26 #include "block/throttle-groups.h"
27 #include "qemu/queue.h"
28 #include "qemu/thread.h"
29 #include "sysemu/qtest.h"
30 
31 /* The ThrottleGroup structure (with its ThrottleState) is shared
32  * among different BlockDriverState and it's independent from
33  * AioContext, so in order to use it from different threads it needs
34  * its own locking.
35  *
36  * This locking is however handled internally in this file, so it's
37  * transparent to outside users.
38  *
39  * The whole ThrottleGroup structure is private and invisible to
40  * outside users, that only use it through its ThrottleState.
41  *
42  * In addition to the ThrottleGroup structure, BlockDriverState has
43  * fields that need to be accessed by other members of the group and
44  * therefore also need to be protected by this lock. Once a BDS is
45  * registered in a group those fields can be accessed by other threads
46  * any time.
47  *
48  * Again, all this is handled internally and is mostly transparent to
49  * the outside. The 'throttle_timers' field however has an additional
50  * constraint because it may be temporarily invalid (see for example
51  * bdrv_set_aio_context()). Therefore in this file a thread will
52  * access some other BDS's timers only after verifying that that BDS
53  * has throttled requests in the queue.
54  */
55 typedef struct ThrottleGroup {
56     char *name; /* This is constant during the lifetime of the group */
57 
58     QemuMutex lock; /* This lock protects the following four fields */
59     ThrottleState ts;
60     QLIST_HEAD(, BlockDriverState) head;
61     BlockDriverState *tokens[2];
62     bool any_timer_armed[2];
63 
64     /* These two are protected by the global throttle_groups_lock */
65     unsigned refcount;
66     QTAILQ_ENTRY(ThrottleGroup) list;
67 } ThrottleGroup;
68 
69 static QemuMutex throttle_groups_lock;
70 static QTAILQ_HEAD(, ThrottleGroup) throttle_groups =
71     QTAILQ_HEAD_INITIALIZER(throttle_groups);
72 
73 /* Increments the reference count of a ThrottleGroup given its name.
74  *
75  * If no ThrottleGroup is found with the given name a new one is
76  * created.
77  *
78  * @name: the name of the ThrottleGroup
79  * @ret:  the ThrottleState member of the ThrottleGroup
80  */
81 ThrottleState *throttle_group_incref(const char *name)
82 {
83     ThrottleGroup *tg = NULL;
84     ThrottleGroup *iter;
85 
86     qemu_mutex_lock(&throttle_groups_lock);
87 
88     /* Look for an existing group with that name */
89     QTAILQ_FOREACH(iter, &throttle_groups, list) {
90         if (!strcmp(name, iter->name)) {
91             tg = iter;
92             break;
93         }
94     }
95 
96     /* Create a new one if not found */
97     if (!tg) {
98         tg = g_new0(ThrottleGroup, 1);
99         tg->name = g_strdup(name);
100         qemu_mutex_init(&tg->lock);
101         throttle_init(&tg->ts);
102         QLIST_INIT(&tg->head);
103 
104         QTAILQ_INSERT_TAIL(&throttle_groups, tg, list);
105     }
106 
107     tg->refcount++;
108 
109     qemu_mutex_unlock(&throttle_groups_lock);
110 
111     return &tg->ts;
112 }
113 
114 /* Decrease the reference count of a ThrottleGroup.
115  *
116  * When the reference count reaches zero the ThrottleGroup is
117  * destroyed.
118  *
119  * @ts:  The ThrottleGroup to unref, given by its ThrottleState member
120  */
121 void throttle_group_unref(ThrottleState *ts)
122 {
123     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
124 
125     qemu_mutex_lock(&throttle_groups_lock);
126     if (--tg->refcount == 0) {
127         QTAILQ_REMOVE(&throttle_groups, tg, list);
128         qemu_mutex_destroy(&tg->lock);
129         g_free(tg->name);
130         g_free(tg);
131     }
132     qemu_mutex_unlock(&throttle_groups_lock);
133 }
134 
135 /* Get the name from a BlockDriverState's ThrottleGroup. The name (and
136  * the pointer) is guaranteed to remain constant during the lifetime
137  * of the group.
138  *
139  * @bs:   a BlockDriverState that is member of a throttling group
140  * @ret:  the name of the group.
141  */
142 const char *throttle_group_get_name(BlockDriverState *bs)
143 {
144     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
145     return tg->name;
146 }
147 
148 /* Return the next BlockDriverState in the round-robin sequence,
149  * simulating a circular list.
150  *
151  * This assumes that tg->lock is held.
152  *
153  * @bs:  the current BlockDriverState
154  * @ret: the next BlockDriverState in the sequence
155  */
156 static BlockDriverState *throttle_group_next_bs(BlockDriverState *bs)
157 {
158     ThrottleState *ts = bs->throttle_state;
159     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
160     BlockDriverState *next = QLIST_NEXT(bs, round_robin);
161 
162     if (!next) {
163         return QLIST_FIRST(&tg->head);
164     }
165 
166     return next;
167 }
168 
169 /* Return the next BlockDriverState in the round-robin sequence with
170  * pending I/O requests.
171  *
172  * This assumes that tg->lock is held.
173  *
174  * @bs:        the current BlockDriverState
175  * @is_write:  the type of operation (read/write)
176  * @ret:       the next BlockDriverState with pending requests, or bs
177  *             if there is none.
178  */
179 static BlockDriverState *next_throttle_token(BlockDriverState *bs,
180                                              bool is_write)
181 {
182     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
183     BlockDriverState *token, *start;
184 
185     start = token = tg->tokens[is_write];
186 
187     /* get next bs round in round robin style */
188     token = throttle_group_next_bs(token);
189     while (token != start && !token->pending_reqs[is_write]) {
190         token = throttle_group_next_bs(token);
191     }
192 
193     /* If no IO are queued for scheduling on the next round robin token
194      * then decide the token is the current bs because chances are
195      * the current bs get the current request queued.
196      */
197     if (token == start && !token->pending_reqs[is_write]) {
198         token = bs;
199     }
200 
201     return token;
202 }
203 
204 /* Check if the next I/O request for a BlockDriverState needs to be
205  * throttled or not. If there's no timer set in this group, set one
206  * and update the token accordingly.
207  *
208  * This assumes that tg->lock is held.
209  *
210  * @bs:         the current BlockDriverState
211  * @is_write:   the type of operation (read/write)
212  * @ret:        whether the I/O request needs to be throttled or not
213  */
214 static bool throttle_group_schedule_timer(BlockDriverState *bs,
215                                           bool is_write)
216 {
217     ThrottleState *ts = bs->throttle_state;
218     ThrottleTimers *tt = &bs->throttle_timers;
219     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
220     bool must_wait;
221 
222     if (bs->io_limits_disabled) {
223         return false;
224     }
225 
226     /* Check if any of the timers in this group is already armed */
227     if (tg->any_timer_armed[is_write]) {
228         return true;
229     }
230 
231     must_wait = throttle_schedule_timer(ts, tt, is_write);
232 
233     /* If a timer just got armed, set bs as the current token */
234     if (must_wait) {
235         tg->tokens[is_write] = bs;
236         tg->any_timer_armed[is_write] = true;
237     }
238 
239     return must_wait;
240 }
241 
242 /* Look for the next pending I/O request and schedule it.
243  *
244  * This assumes that tg->lock is held.
245  *
246  * @bs:        the current BlockDriverState
247  * @is_write:  the type of operation (read/write)
248  */
249 static void schedule_next_request(BlockDriverState *bs, bool is_write)
250 {
251     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
252     bool must_wait;
253     BlockDriverState *token;
254 
255     /* Check if there's any pending request to schedule next */
256     token = next_throttle_token(bs, is_write);
257     if (!token->pending_reqs[is_write]) {
258         return;
259     }
260 
261     /* Set a timer for the request if it needs to be throttled */
262     must_wait = throttle_group_schedule_timer(token, is_write);
263 
264     /* If it doesn't have to wait, queue it for immediate execution */
265     if (!must_wait) {
266         /* Give preference to requests from the current bs */
267         if (qemu_in_coroutine() &&
268             qemu_co_queue_next(&bs->throttled_reqs[is_write])) {
269             token = bs;
270         } else {
271             ThrottleTimers *tt = &token->throttle_timers;
272             int64_t now = qemu_clock_get_ns(tt->clock_type);
273             timer_mod(tt->timers[is_write], now + 1);
274             tg->any_timer_armed[is_write] = true;
275         }
276         tg->tokens[is_write] = token;
277     }
278 }
279 
280 /* Check if an I/O request needs to be throttled, wait and set a timer
281  * if necessary, and schedule the next request using a round robin
282  * algorithm.
283  *
284  * @bs:        the current BlockDriverState
285  * @bytes:     the number of bytes for this I/O
286  * @is_write:  the type of operation (read/write)
287  */
288 void coroutine_fn throttle_group_co_io_limits_intercept(BlockDriverState *bs,
289                                                         unsigned int bytes,
290                                                         bool is_write)
291 {
292     bool must_wait;
293     BlockDriverState *token;
294 
295     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
296     qemu_mutex_lock(&tg->lock);
297 
298     /* First we check if this I/O has to be throttled. */
299     token = next_throttle_token(bs, is_write);
300     must_wait = throttle_group_schedule_timer(token, is_write);
301 
302     /* Wait if there's a timer set or queued requests of this type */
303     if (must_wait || bs->pending_reqs[is_write]) {
304         bs->pending_reqs[is_write]++;
305         qemu_mutex_unlock(&tg->lock);
306         qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
307         qemu_mutex_lock(&tg->lock);
308         bs->pending_reqs[is_write]--;
309     }
310 
311     /* The I/O will be executed, so do the accounting */
312     throttle_account(bs->throttle_state, is_write, bytes);
313 
314     /* Schedule the next request */
315     schedule_next_request(bs, is_write);
316 
317     qemu_mutex_unlock(&tg->lock);
318 }
319 
320 void throttle_group_restart_bs(BlockDriverState *bs)
321 {
322     int i;
323 
324     for (i = 0; i < 2; i++) {
325         while (qemu_co_enter_next(&bs->throttled_reqs[i])) {
326             ;
327         }
328     }
329 }
330 
331 /* Update the throttle configuration for a particular group. Similar
332  * to throttle_config(), but guarantees atomicity within the
333  * throttling group.
334  *
335  * @bs:  a BlockDriverState that is member of the group
336  * @cfg: the configuration to set
337  */
338 void throttle_group_config(BlockDriverState *bs, ThrottleConfig *cfg)
339 {
340     ThrottleTimers *tt = &bs->throttle_timers;
341     ThrottleState *ts = bs->throttle_state;
342     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
343     qemu_mutex_lock(&tg->lock);
344     /* throttle_config() cancels the timers */
345     if (timer_pending(tt->timers[0])) {
346         tg->any_timer_armed[0] = false;
347     }
348     if (timer_pending(tt->timers[1])) {
349         tg->any_timer_armed[1] = false;
350     }
351     throttle_config(ts, tt, cfg);
352     qemu_mutex_unlock(&tg->lock);
353 
354     qemu_co_enter_next(&bs->throttled_reqs[0]);
355     qemu_co_enter_next(&bs->throttled_reqs[1]);
356 }
357 
358 /* Get the throttle configuration from a particular group. Similar to
359  * throttle_get_config(), but guarantees atomicity within the
360  * throttling group.
361  *
362  * @bs:  a BlockDriverState that is member of the group
363  * @cfg: the configuration will be written here
364  */
365 void throttle_group_get_config(BlockDriverState *bs, ThrottleConfig *cfg)
366 {
367     ThrottleState *ts = bs->throttle_state;
368     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
369     qemu_mutex_lock(&tg->lock);
370     throttle_get_config(ts, cfg);
371     qemu_mutex_unlock(&tg->lock);
372 }
373 
374 /* ThrottleTimers callback. This wakes up a request that was waiting
375  * because it had been throttled.
376  *
377  * @bs:        the BlockDriverState whose request had been throttled
378  * @is_write:  the type of operation (read/write)
379  */
380 static void timer_cb(BlockDriverState *bs, bool is_write)
381 {
382     ThrottleState *ts = bs->throttle_state;
383     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
384     bool empty_queue;
385 
386     /* The timer has just been fired, so we can update the flag */
387     qemu_mutex_lock(&tg->lock);
388     tg->any_timer_armed[is_write] = false;
389     qemu_mutex_unlock(&tg->lock);
390 
391     /* Run the request that was waiting for this timer */
392     empty_queue = !qemu_co_enter_next(&bs->throttled_reqs[is_write]);
393 
394     /* If the request queue was empty then we have to take care of
395      * scheduling the next one */
396     if (empty_queue) {
397         qemu_mutex_lock(&tg->lock);
398         schedule_next_request(bs, is_write);
399         qemu_mutex_unlock(&tg->lock);
400     }
401 }
402 
403 static void read_timer_cb(void *opaque)
404 {
405     timer_cb(opaque, false);
406 }
407 
408 static void write_timer_cb(void *opaque)
409 {
410     timer_cb(opaque, true);
411 }
412 
413 /* Register a BlockDriverState in the throttling group, also
414  * initializing its timers and updating its throttle_state pointer to
415  * point to it. If a throttling group with that name does not exist
416  * yet, it will be created.
417  *
418  * @bs:        the BlockDriverState to insert
419  * @groupname: the name of the group
420  */
421 void throttle_group_register_bs(BlockDriverState *bs, const char *groupname)
422 {
423     int i;
424     ThrottleState *ts = throttle_group_incref(groupname);
425     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
426     int clock_type = QEMU_CLOCK_REALTIME;
427 
428     if (qtest_enabled()) {
429         /* For testing block IO throttling only */
430         clock_type = QEMU_CLOCK_VIRTUAL;
431     }
432 
433     bs->throttle_state = ts;
434 
435     qemu_mutex_lock(&tg->lock);
436     /* If the ThrottleGroup is new set this BlockDriverState as the token */
437     for (i = 0; i < 2; i++) {
438         if (!tg->tokens[i]) {
439             tg->tokens[i] = bs;
440         }
441     }
442 
443     QLIST_INSERT_HEAD(&tg->head, bs, round_robin);
444 
445     throttle_timers_init(&bs->throttle_timers,
446                          bdrv_get_aio_context(bs),
447                          clock_type,
448                          read_timer_cb,
449                          write_timer_cb,
450                          bs);
451 
452     qemu_mutex_unlock(&tg->lock);
453 }
454 
455 /* Unregister a BlockDriverState from its group, removing it from the
456  * list, destroying the timers and setting the throttle_state pointer
457  * to NULL.
458  *
459  * The BlockDriverState must not have pending throttled requests, so
460  * the caller has to drain them first.
461  *
462  * The group will be destroyed if it's empty after this operation.
463  *
464  * @bs: the BlockDriverState to remove
465  */
466 void throttle_group_unregister_bs(BlockDriverState *bs)
467 {
468     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
469     int i;
470 
471     assert(bs->pending_reqs[0] == 0 && bs->pending_reqs[1] == 0);
472     assert(qemu_co_queue_empty(&bs->throttled_reqs[0]));
473     assert(qemu_co_queue_empty(&bs->throttled_reqs[1]));
474 
475     qemu_mutex_lock(&tg->lock);
476     for (i = 0; i < 2; i++) {
477         if (tg->tokens[i] == bs) {
478             BlockDriverState *token = throttle_group_next_bs(bs);
479             /* Take care of the case where this is the last bs in the group */
480             if (token == bs) {
481                 token = NULL;
482             }
483             tg->tokens[i] = token;
484         }
485     }
486 
487     /* remove the current bs from the list */
488     QLIST_REMOVE(bs, round_robin);
489     throttle_timers_destroy(&bs->throttle_timers);
490     qemu_mutex_unlock(&tg->lock);
491 
492     throttle_group_unref(&tg->ts);
493     bs->throttle_state = NULL;
494 }
495 
496 static void throttle_groups_init(void)
497 {
498     qemu_mutex_init(&throttle_groups_lock);
499 }
500 
501 block_init(throttle_groups_init);
502