xref: /qemu/io/task.c (revision cae9fc56)
1 /*
2  * QEMU I/O task
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #include "qemu/osdep.h"
22 #include "io/task.h"
23 #include "qemu/thread.h"
24 #include "trace.h"
25 
26 struct QIOTask {
27     Object *source;
28     QIOTaskFunc func;
29     gpointer opaque;
30     GDestroyNotify destroy;
31 };
32 
33 
34 QIOTask *qio_task_new(Object *source,
35                       QIOTaskFunc func,
36                       gpointer opaque,
37                       GDestroyNotify destroy)
38 {
39     QIOTask *task;
40 
41     task = g_new0(QIOTask, 1);
42 
43     task->source = source;
44     object_ref(source);
45     task->func = func;
46     task->opaque = opaque;
47     task->destroy = destroy;
48 
49     trace_qio_task_new(task, source, func, opaque);
50 
51     return task;
52 }
53 
54 static void qio_task_free(QIOTask *task)
55 {
56     if (task->destroy) {
57         task->destroy(task->opaque);
58     }
59     object_unref(task->source);
60 
61     g_free(task);
62 }
63 
64 
65 struct QIOTaskThreadData {
66     QIOTask *task;
67     QIOTaskWorker worker;
68     gpointer opaque;
69     GDestroyNotify destroy;
70     Error *err;
71     int ret;
72 };
73 
74 
75 static gboolean gio_task_thread_result(gpointer opaque)
76 {
77     struct QIOTaskThreadData *data = opaque;
78 
79     trace_qio_task_thread_result(data->task);
80     if (data->ret == 0) {
81         qio_task_complete(data->task);
82     } else {
83         qio_task_abort(data->task, data->err);
84     }
85 
86     error_free(data->err);
87     if (data->destroy) {
88         data->destroy(data->opaque);
89     }
90 
91     g_free(data);
92 
93     return FALSE;
94 }
95 
96 
97 static gpointer qio_task_thread_worker(gpointer opaque)
98 {
99     struct QIOTaskThreadData *data = opaque;
100 
101     trace_qio_task_thread_run(data->task);
102     data->ret = data->worker(data->task, &data->err, data->opaque);
103     if (data->ret < 0 && data->err == NULL) {
104         error_setg(&data->err, "Task worker failed but did not set an error");
105     }
106 
107     /* We're running in the background thread, and must only
108      * ever report the task results in the main event loop
109      * thread. So we schedule an idle callback to report
110      * the worker results
111      */
112     trace_qio_task_thread_exit(data->task);
113     g_idle_add(gio_task_thread_result, data);
114     return NULL;
115 }
116 
117 
118 void qio_task_run_in_thread(QIOTask *task,
119                             QIOTaskWorker worker,
120                             gpointer opaque,
121                             GDestroyNotify destroy)
122 {
123     struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1);
124     QemuThread thread;
125 
126     data->task = task;
127     data->worker = worker;
128     data->opaque = opaque;
129     data->destroy = destroy;
130 
131     trace_qio_task_thread_start(task, worker, opaque);
132     qemu_thread_create(&thread,
133                        "io-task-worker",
134                        qio_task_thread_worker,
135                        data,
136                        QEMU_THREAD_DETACHED);
137 }
138 
139 
140 void qio_task_complete(QIOTask *task)
141 {
142     task->func(task->source, NULL, task->opaque);
143     trace_qio_task_complete(task);
144     qio_task_free(task);
145 }
146 
147 void qio_task_abort(QIOTask *task,
148                     Error *err)
149 {
150     task->func(task->source, err, task->opaque);
151     trace_qio_task_abort(task);
152     qio_task_free(task);
153 }
154 
155 
156 Object *qio_task_get_source(QIOTask *task)
157 {
158     object_ref(task->source);
159     return task->source;
160 }
161