xref: /qemu/io/task.c (revision abff1abf)
1 /*
2  * QEMU I/O task
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #include "qemu/osdep.h"
22 #include "io/task.h"
23 #include "qapi/error.h"
24 #include "qemu/thread.h"
25 #include "qom/object.h"
26 #include "trace.h"
27 
28 struct QIOTaskThreadData {
29     QIOTaskWorker worker;
30     gpointer opaque;
31     GDestroyNotify destroy;
32     GMainContext *context;
33     GSource *completion;
34 };
35 
36 
37 struct QIOTask {
38     Object *source;
39     QIOTaskFunc func;
40     gpointer opaque;
41     GDestroyNotify destroy;
42     Error *err;
43     gpointer result;
44     GDestroyNotify destroyResult;
45     QemuMutex thread_lock;
46     QemuCond thread_cond;
47     struct QIOTaskThreadData *thread;
48 };
49 
50 
51 QIOTask *qio_task_new(Object *source,
52                       QIOTaskFunc func,
53                       gpointer opaque,
54                       GDestroyNotify destroy)
55 {
56     QIOTask *task;
57 
58     task = g_new0(QIOTask, 1);
59 
60     task->source = source;
61     object_ref(source);
62     task->func = func;
63     task->opaque = opaque;
64     task->destroy = destroy;
65     qemu_mutex_init(&task->thread_lock);
66     qemu_cond_init(&task->thread_cond);
67 
68     trace_qio_task_new(task, source, func, opaque);
69 
70     return task;
71 }
72 
73 static void qio_task_free(QIOTask *task)
74 {
75     qemu_mutex_lock(&task->thread_lock);
76     if (task->thread) {
77         if (task->thread->destroy) {
78             task->thread->destroy(task->thread->opaque);
79         }
80 
81         if (task->thread->context) {
82             g_main_context_unref(task->thread->context);
83         }
84 
85         g_free(task->thread);
86     }
87 
88     if (task->destroy) {
89         task->destroy(task->opaque);
90     }
91     if (task->destroyResult) {
92         task->destroyResult(task->result);
93     }
94     if (task->err) {
95         error_free(task->err);
96     }
97     object_unref(task->source);
98 
99     qemu_mutex_unlock(&task->thread_lock);
100     qemu_mutex_destroy(&task->thread_lock);
101     qemu_cond_destroy(&task->thread_cond);
102 
103     g_free(task);
104 }
105 
106 
107 static gboolean qio_task_thread_result(gpointer opaque)
108 {
109     QIOTask *task = opaque;
110 
111     trace_qio_task_thread_result(task);
112     qio_task_complete(task);
113 
114     return FALSE;
115 }
116 
117 
118 static gpointer qio_task_thread_worker(gpointer opaque)
119 {
120     QIOTask *task = opaque;
121 
122     trace_qio_task_thread_run(task);
123 
124     task->thread->worker(task, task->thread->opaque);
125 
126     /* We're running in the background thread, and must only
127      * ever report the task results in the main event loop
128      * thread. So we schedule an idle callback to report
129      * the worker results
130      */
131     trace_qio_task_thread_exit(task);
132 
133     qemu_mutex_lock(&task->thread_lock);
134 
135     task->thread->completion = g_idle_source_new();
136     g_source_set_callback(task->thread->completion,
137                           qio_task_thread_result, task, NULL);
138     g_source_attach(task->thread->completion,
139                     task->thread->context);
140     g_source_unref(task->thread->completion);
141     trace_qio_task_thread_source_attach(task, task->thread->completion);
142 
143     qemu_cond_signal(&task->thread_cond);
144     qemu_mutex_unlock(&task->thread_lock);
145 
146     return NULL;
147 }
148 
149 
150 void qio_task_run_in_thread(QIOTask *task,
151                             QIOTaskWorker worker,
152                             gpointer opaque,
153                             GDestroyNotify destroy,
154                             GMainContext *context)
155 {
156     struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1);
157     QemuThread thread;
158 
159     if (context) {
160         g_main_context_ref(context);
161     }
162 
163     data->worker = worker;
164     data->opaque = opaque;
165     data->destroy = destroy;
166     data->context = context;
167 
168     task->thread = data;
169 
170     trace_qio_task_thread_start(task, worker, opaque);
171     qemu_thread_create(&thread,
172                        "io-task-worker",
173                        qio_task_thread_worker,
174                        task,
175                        QEMU_THREAD_DETACHED);
176 }
177 
178 
179 void qio_task_wait_thread(QIOTask *task)
180 {
181     qemu_mutex_lock(&task->thread_lock);
182     g_assert(task->thread != NULL);
183     while (task->thread->completion == NULL) {
184         qemu_cond_wait(&task->thread_cond, &task->thread_lock);
185     }
186 
187     trace_qio_task_thread_source_cancel(task, task->thread->completion);
188     g_source_destroy(task->thread->completion);
189     qemu_mutex_unlock(&task->thread_lock);
190 
191     qio_task_thread_result(task);
192 }
193 
194 
195 void qio_task_complete(QIOTask *task)
196 {
197     task->func(task, task->opaque);
198     trace_qio_task_complete(task);
199     qio_task_free(task);
200 }
201 
202 
203 void qio_task_set_error(QIOTask *task,
204                         Error *err)
205 {
206     error_propagate(&task->err, err);
207 }
208 
209 
210 bool qio_task_propagate_error(QIOTask *task,
211                               Error **errp)
212 {
213     if (task->err) {
214         error_propagate(errp, task->err);
215         task->err = NULL;
216         return true;
217     }
218 
219     return false;
220 }
221 
222 
223 void qio_task_set_result_pointer(QIOTask *task,
224                                  gpointer result,
225                                  GDestroyNotify destroy)
226 {
227     task->result = result;
228     task->destroyResult = destroy;
229 }
230 
231 
232 gpointer qio_task_get_result_pointer(QIOTask *task)
233 {
234     return task->result;
235 }
236 
237 
238 Object *qio_task_get_source(QIOTask *task)
239 {
240     return task->source;
241 }
242