xref: /qemu/iothread.c (revision 33848cee)
1 /*
2  * Event loop thread
3  *
4  * Copyright Red Hat Inc., 2013
5  *
6  * Authors:
7  *  Stefan Hajnoczi   <stefanha@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  *
12  */
13 
14 #include "qemu/osdep.h"
15 #include "qom/object.h"
16 #include "qom/object_interfaces.h"
17 #include "qemu/module.h"
18 #include "block/aio.h"
19 #include "block/block.h"
20 #include "sysemu/iothread.h"
21 #include "qmp-commands.h"
22 #include "qemu/error-report.h"
23 #include "qemu/rcu.h"
24 #include "qemu/main-loop.h"
25 
26 typedef ObjectClass IOThreadClass;
27 
28 #define IOTHREAD_GET_CLASS(obj) \
29    OBJECT_GET_CLASS(IOThreadClass, obj, TYPE_IOTHREAD)
30 #define IOTHREAD_CLASS(klass) \
31    OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD)
32 
33 static __thread IOThread *my_iothread;
34 
35 AioContext *qemu_get_current_aio_context(void)
36 {
37     return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
38 }
39 
40 static void *iothread_run(void *opaque)
41 {
42     IOThread *iothread = opaque;
43 
44     rcu_register_thread();
45 
46     my_iothread = iothread;
47     qemu_mutex_lock(&iothread->init_done_lock);
48     iothread->thread_id = qemu_get_thread_id();
49     qemu_cond_signal(&iothread->init_done_cond);
50     qemu_mutex_unlock(&iothread->init_done_lock);
51 
52     while (!atomic_read(&iothread->stopping)) {
53         aio_poll(iothread->ctx, true);
54     }
55 
56     rcu_unregister_thread();
57     return NULL;
58 }
59 
60 static int iothread_stop(Object *object, void *opaque)
61 {
62     IOThread *iothread;
63 
64     iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
65     if (!iothread || !iothread->ctx) {
66         return 0;
67     }
68     iothread->stopping = true;
69     aio_notify(iothread->ctx);
70     qemu_thread_join(&iothread->thread);
71     return 0;
72 }
73 
74 static void iothread_instance_finalize(Object *obj)
75 {
76     IOThread *iothread = IOTHREAD(obj);
77 
78     iothread_stop(obj, NULL);
79     qemu_cond_destroy(&iothread->init_done_cond);
80     qemu_mutex_destroy(&iothread->init_done_lock);
81     if (!iothread->ctx) {
82         return;
83     }
84     aio_context_unref(iothread->ctx);
85 }
86 
87 static void iothread_complete(UserCreatable *obj, Error **errp)
88 {
89     Error *local_error = NULL;
90     IOThread *iothread = IOTHREAD(obj);
91     char *name, *thread_name;
92 
93     iothread->stopping = false;
94     iothread->thread_id = -1;
95     iothread->ctx = aio_context_new(&local_error);
96     if (!iothread->ctx) {
97         error_propagate(errp, local_error);
98         return;
99     }
100 
101     aio_context_set_poll_params(iothread->ctx,
102                                 iothread->poll_max_ns,
103                                 iothread->poll_grow,
104                                 iothread->poll_shrink,
105                                 &local_error);
106     if (local_error) {
107         error_propagate(errp, local_error);
108         aio_context_unref(iothread->ctx);
109         iothread->ctx = NULL;
110         return;
111     }
112 
113     qemu_mutex_init(&iothread->init_done_lock);
114     qemu_cond_init(&iothread->init_done_cond);
115 
116     /* This assumes we are called from a thread with useful CPU affinity for us
117      * to inherit.
118      */
119     name = object_get_canonical_path_component(OBJECT(obj));
120     thread_name = g_strdup_printf("IO %s", name);
121     qemu_thread_create(&iothread->thread, thread_name, iothread_run,
122                        iothread, QEMU_THREAD_JOINABLE);
123     g_free(thread_name);
124     g_free(name);
125 
126     /* Wait for initialization to complete */
127     qemu_mutex_lock(&iothread->init_done_lock);
128     while (iothread->thread_id == -1) {
129         qemu_cond_wait(&iothread->init_done_cond,
130                        &iothread->init_done_lock);
131     }
132     qemu_mutex_unlock(&iothread->init_done_lock);
133 }
134 
135 typedef struct {
136     const char *name;
137     ptrdiff_t offset; /* field's byte offset in IOThread struct */
138 } PollParamInfo;
139 
140 static PollParamInfo poll_max_ns_info = {
141     "poll-max-ns", offsetof(IOThread, poll_max_ns),
142 };
143 static PollParamInfo poll_grow_info = {
144     "poll-grow", offsetof(IOThread, poll_grow),
145 };
146 static PollParamInfo poll_shrink_info = {
147     "poll-shrink", offsetof(IOThread, poll_shrink),
148 };
149 
150 static void iothread_get_poll_param(Object *obj, Visitor *v,
151         const char *name, void *opaque, Error **errp)
152 {
153     IOThread *iothread = IOTHREAD(obj);
154     PollParamInfo *info = opaque;
155     int64_t *field = (void *)iothread + info->offset;
156 
157     visit_type_int64(v, name, field, errp);
158 }
159 
160 static void iothread_set_poll_param(Object *obj, Visitor *v,
161         const char *name, void *opaque, Error **errp)
162 {
163     IOThread *iothread = IOTHREAD(obj);
164     PollParamInfo *info = opaque;
165     int64_t *field = (void *)iothread + info->offset;
166     Error *local_err = NULL;
167     int64_t value;
168 
169     visit_type_int64(v, name, &value, &local_err);
170     if (local_err) {
171         goto out;
172     }
173 
174     if (value < 0) {
175         error_setg(&local_err, "%s value must be in range [0, %"PRId64"]",
176                    info->name, INT64_MAX);
177         goto out;
178     }
179 
180     *field = value;
181 
182     if (iothread->ctx) {
183         aio_context_set_poll_params(iothread->ctx,
184                                     iothread->poll_max_ns,
185                                     iothread->poll_grow,
186                                     iothread->poll_shrink,
187                                     &local_err);
188     }
189 
190 out:
191     error_propagate(errp, local_err);
192 }
193 
194 static void iothread_class_init(ObjectClass *klass, void *class_data)
195 {
196     UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
197     ucc->complete = iothread_complete;
198 
199     object_class_property_add(klass, "poll-max-ns", "int",
200                               iothread_get_poll_param,
201                               iothread_set_poll_param,
202                               NULL, &poll_max_ns_info, &error_abort);
203     object_class_property_add(klass, "poll-grow", "int",
204                               iothread_get_poll_param,
205                               iothread_set_poll_param,
206                               NULL, &poll_grow_info, &error_abort);
207     object_class_property_add(klass, "poll-shrink", "int",
208                               iothread_get_poll_param,
209                               iothread_set_poll_param,
210                               NULL, &poll_shrink_info, &error_abort);
211 }
212 
213 static const TypeInfo iothread_info = {
214     .name = TYPE_IOTHREAD,
215     .parent = TYPE_OBJECT,
216     .class_init = iothread_class_init,
217     .instance_size = sizeof(IOThread),
218     .instance_finalize = iothread_instance_finalize,
219     .interfaces = (InterfaceInfo[]) {
220         {TYPE_USER_CREATABLE},
221         {}
222     },
223 };
224 
225 static void iothread_register_types(void)
226 {
227     type_register_static(&iothread_info);
228 }
229 
230 type_init(iothread_register_types)
231 
232 char *iothread_get_id(IOThread *iothread)
233 {
234     return object_get_canonical_path_component(OBJECT(iothread));
235 }
236 
237 AioContext *iothread_get_aio_context(IOThread *iothread)
238 {
239     return iothread->ctx;
240 }
241 
242 static int query_one_iothread(Object *object, void *opaque)
243 {
244     IOThreadInfoList ***prev = opaque;
245     IOThreadInfoList *elem;
246     IOThreadInfo *info;
247     IOThread *iothread;
248 
249     iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
250     if (!iothread) {
251         return 0;
252     }
253 
254     info = g_new0(IOThreadInfo, 1);
255     info->id = iothread_get_id(iothread);
256     info->thread_id = iothread->thread_id;
257 
258     elem = g_new0(IOThreadInfoList, 1);
259     elem->value = info;
260     elem->next = NULL;
261 
262     **prev = elem;
263     *prev = &elem->next;
264     return 0;
265 }
266 
267 IOThreadInfoList *qmp_query_iothreads(Error **errp)
268 {
269     IOThreadInfoList *head = NULL;
270     IOThreadInfoList **prev = &head;
271     Object *container = object_get_objects_root();
272 
273     object_child_foreach(container, query_one_iothread, &prev);
274     return head;
275 }
276 
277 void iothread_stop_all(void)
278 {
279     Object *container = object_get_objects_root();
280     BlockDriverState *bs;
281     BdrvNextIterator it;
282 
283     for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) {
284         AioContext *ctx = bdrv_get_aio_context(bs);
285         if (ctx == qemu_get_aio_context()) {
286             continue;
287         }
288         aio_context_acquire(ctx);
289         bdrv_set_aio_context(bs, qemu_get_aio_context());
290         aio_context_release(ctx);
291     }
292 
293     object_child_foreach(container, iothread_stop, NULL);
294 }
295