xref: /qemu/iothread.c (revision 6402cbbb)
1 /*
2  * Event loop thread
3  *
4  * Copyright Red Hat Inc., 2013
5  *
6  * Authors:
7  *  Stefan Hajnoczi   <stefanha@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  *
12  */
13 
14 #include "qemu/osdep.h"
15 #include "qom/object.h"
16 #include "qom/object_interfaces.h"
17 #include "qemu/module.h"
18 #include "block/aio.h"
19 #include "block/block.h"
20 #include "sysemu/iothread.h"
21 #include "qmp-commands.h"
22 #include "qemu/error-report.h"
23 #include "qemu/rcu.h"
24 #include "qemu/main-loop.h"
25 
26 typedef ObjectClass IOThreadClass;
27 
28 #define IOTHREAD_GET_CLASS(obj) \
29    OBJECT_GET_CLASS(IOThreadClass, obj, TYPE_IOTHREAD)
30 #define IOTHREAD_CLASS(klass) \
31    OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD)
32 
33 /* Benchmark results from 2016 on NVMe SSD drives show max polling times around
34  * 16-32 microseconds yield IOPS improvements for both iodepth=1 and iodepth=32
35  * workloads.
36  */
37 #define IOTHREAD_POLL_MAX_NS_DEFAULT 32768ULL
38 
39 static __thread IOThread *my_iothread;
40 
41 AioContext *qemu_get_current_aio_context(void)
42 {
43     return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
44 }
45 
46 static void *iothread_run(void *opaque)
47 {
48     IOThread *iothread = opaque;
49 
50     rcu_register_thread();
51 
52     my_iothread = iothread;
53     qemu_mutex_lock(&iothread->init_done_lock);
54     iothread->thread_id = qemu_get_thread_id();
55     qemu_cond_signal(&iothread->init_done_cond);
56     qemu_mutex_unlock(&iothread->init_done_lock);
57 
58     while (!atomic_read(&iothread->stopping)) {
59         aio_poll(iothread->ctx, true);
60     }
61 
62     rcu_unregister_thread();
63     return NULL;
64 }
65 
66 static int iothread_stop(Object *object, void *opaque)
67 {
68     IOThread *iothread;
69 
70     iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
71     if (!iothread || !iothread->ctx) {
72         return 0;
73     }
74     iothread->stopping = true;
75     aio_notify(iothread->ctx);
76     qemu_thread_join(&iothread->thread);
77     return 0;
78 }
79 
80 static void iothread_instance_init(Object *obj)
81 {
82     IOThread *iothread = IOTHREAD(obj);
83 
84     iothread->poll_max_ns = IOTHREAD_POLL_MAX_NS_DEFAULT;
85 }
86 
87 static void iothread_instance_finalize(Object *obj)
88 {
89     IOThread *iothread = IOTHREAD(obj);
90 
91     iothread_stop(obj, NULL);
92     qemu_cond_destroy(&iothread->init_done_cond);
93     qemu_mutex_destroy(&iothread->init_done_lock);
94     if (!iothread->ctx) {
95         return;
96     }
97     aio_context_unref(iothread->ctx);
98 }
99 
100 static void iothread_complete(UserCreatable *obj, Error **errp)
101 {
102     Error *local_error = NULL;
103     IOThread *iothread = IOTHREAD(obj);
104     char *name, *thread_name;
105 
106     iothread->stopping = false;
107     iothread->thread_id = -1;
108     iothread->ctx = aio_context_new(&local_error);
109     if (!iothread->ctx) {
110         error_propagate(errp, local_error);
111         return;
112     }
113 
114     aio_context_set_poll_params(iothread->ctx,
115                                 iothread->poll_max_ns,
116                                 iothread->poll_grow,
117                                 iothread->poll_shrink,
118                                 &local_error);
119     if (local_error) {
120         error_propagate(errp, local_error);
121         aio_context_unref(iothread->ctx);
122         iothread->ctx = NULL;
123         return;
124     }
125 
126     qemu_mutex_init(&iothread->init_done_lock);
127     qemu_cond_init(&iothread->init_done_cond);
128 
129     /* This assumes we are called from a thread with useful CPU affinity for us
130      * to inherit.
131      */
132     name = object_get_canonical_path_component(OBJECT(obj));
133     thread_name = g_strdup_printf("IO %s", name);
134     qemu_thread_create(&iothread->thread, thread_name, iothread_run,
135                        iothread, QEMU_THREAD_JOINABLE);
136     g_free(thread_name);
137     g_free(name);
138 
139     /* Wait for initialization to complete */
140     qemu_mutex_lock(&iothread->init_done_lock);
141     while (iothread->thread_id == -1) {
142         qemu_cond_wait(&iothread->init_done_cond,
143                        &iothread->init_done_lock);
144     }
145     qemu_mutex_unlock(&iothread->init_done_lock);
146 }
147 
148 typedef struct {
149     const char *name;
150     ptrdiff_t offset; /* field's byte offset in IOThread struct */
151 } PollParamInfo;
152 
153 static PollParamInfo poll_max_ns_info = {
154     "poll-max-ns", offsetof(IOThread, poll_max_ns),
155 };
156 static PollParamInfo poll_grow_info = {
157     "poll-grow", offsetof(IOThread, poll_grow),
158 };
159 static PollParamInfo poll_shrink_info = {
160     "poll-shrink", offsetof(IOThread, poll_shrink),
161 };
162 
163 static void iothread_get_poll_param(Object *obj, Visitor *v,
164         const char *name, void *opaque, Error **errp)
165 {
166     IOThread *iothread = IOTHREAD(obj);
167     PollParamInfo *info = opaque;
168     int64_t *field = (void *)iothread + info->offset;
169 
170     visit_type_int64(v, name, field, errp);
171 }
172 
173 static void iothread_set_poll_param(Object *obj, Visitor *v,
174         const char *name, void *opaque, Error **errp)
175 {
176     IOThread *iothread = IOTHREAD(obj);
177     PollParamInfo *info = opaque;
178     int64_t *field = (void *)iothread + info->offset;
179     Error *local_err = NULL;
180     int64_t value;
181 
182     visit_type_int64(v, name, &value, &local_err);
183     if (local_err) {
184         goto out;
185     }
186 
187     if (value < 0) {
188         error_setg(&local_err, "%s value must be in range [0, %"PRId64"]",
189                    info->name, INT64_MAX);
190         goto out;
191     }
192 
193     *field = value;
194 
195     if (iothread->ctx) {
196         aio_context_set_poll_params(iothread->ctx,
197                                     iothread->poll_max_ns,
198                                     iothread->poll_grow,
199                                     iothread->poll_shrink,
200                                     &local_err);
201     }
202 
203 out:
204     error_propagate(errp, local_err);
205 }
206 
207 static void iothread_class_init(ObjectClass *klass, void *class_data)
208 {
209     UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
210     ucc->complete = iothread_complete;
211 
212     object_class_property_add(klass, "poll-max-ns", "int",
213                               iothread_get_poll_param,
214                               iothread_set_poll_param,
215                               NULL, &poll_max_ns_info, &error_abort);
216     object_class_property_add(klass, "poll-grow", "int",
217                               iothread_get_poll_param,
218                               iothread_set_poll_param,
219                               NULL, &poll_grow_info, &error_abort);
220     object_class_property_add(klass, "poll-shrink", "int",
221                               iothread_get_poll_param,
222                               iothread_set_poll_param,
223                               NULL, &poll_shrink_info, &error_abort);
224 }
225 
226 static const TypeInfo iothread_info = {
227     .name = TYPE_IOTHREAD,
228     .parent = TYPE_OBJECT,
229     .class_init = iothread_class_init,
230     .instance_size = sizeof(IOThread),
231     .instance_init = iothread_instance_init,
232     .instance_finalize = iothread_instance_finalize,
233     .interfaces = (InterfaceInfo[]) {
234         {TYPE_USER_CREATABLE},
235         {}
236     },
237 };
238 
239 static void iothread_register_types(void)
240 {
241     type_register_static(&iothread_info);
242 }
243 
244 type_init(iothread_register_types)
245 
246 char *iothread_get_id(IOThread *iothread)
247 {
248     return object_get_canonical_path_component(OBJECT(iothread));
249 }
250 
251 AioContext *iothread_get_aio_context(IOThread *iothread)
252 {
253     return iothread->ctx;
254 }
255 
256 static int query_one_iothread(Object *object, void *opaque)
257 {
258     IOThreadInfoList ***prev = opaque;
259     IOThreadInfoList *elem;
260     IOThreadInfo *info;
261     IOThread *iothread;
262 
263     iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
264     if (!iothread) {
265         return 0;
266     }
267 
268     info = g_new0(IOThreadInfo, 1);
269     info->id = iothread_get_id(iothread);
270     info->thread_id = iothread->thread_id;
271     info->poll_max_ns = iothread->poll_max_ns;
272     info->poll_grow = iothread->poll_grow;
273     info->poll_shrink = iothread->poll_shrink;
274 
275     elem = g_new0(IOThreadInfoList, 1);
276     elem->value = info;
277     elem->next = NULL;
278 
279     **prev = elem;
280     *prev = &elem->next;
281     return 0;
282 }
283 
284 IOThreadInfoList *qmp_query_iothreads(Error **errp)
285 {
286     IOThreadInfoList *head = NULL;
287     IOThreadInfoList **prev = &head;
288     Object *container = object_get_objects_root();
289 
290     object_child_foreach(container, query_one_iothread, &prev);
291     return head;
292 }
293 
294 void iothread_stop_all(void)
295 {
296     Object *container = object_get_objects_root();
297     BlockDriverState *bs;
298     BdrvNextIterator it;
299 
300     for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) {
301         AioContext *ctx = bdrv_get_aio_context(bs);
302         if (ctx == qemu_get_aio_context()) {
303             continue;
304         }
305         aio_context_acquire(ctx);
306         bdrv_set_aio_context(bs, qemu_get_aio_context());
307         aio_context_release(ctx);
308     }
309 
310     object_child_foreach(container, iothread_stop, NULL);
311 }
312