1 /* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 Copyright (C) 2013 Red Hat, Inc.
4
5 This library is free software; you can redistribute it and/or
6 modify it under the terms of the GNU Lesser General Public
7 License as published by the Free Software Foundation; either
8 version 2.1 of the License, or (at your option) any later version.
9
10 This library is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public
16 License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "config.h"
19
20 #include <string.h>
21
22 #include "vmcstream.h"
23 #include "spice-channel-priv.h"
24 #include "gio-coroutine.h"
25
26 struct _SpiceVmcInputStream
27 {
28 GInputStream parent_instance;
29 GTask *task;
30 struct coroutine *coroutine;
31
32 SpiceChannel *channel;
33 gboolean all;
34 guint8 *buffer;
35 gsize count;
36 gsize pos;
37
38 gulong cancel_id;
39 };
40
41 struct _SpiceVmcInputStreamClass
42 {
43 GInputStreamClass parent_class;
44 };
45
46 static gssize spice_vmc_input_stream_read (GInputStream *stream,
47 void *buffer,
48 gsize count,
49 GCancellable *cancellable,
50 GError **error);
51 static void spice_vmc_input_stream_read_async (GInputStream *stream,
52 void *buffer,
53 gsize count,
54 int io_priority,
55 GCancellable *cancellable,
56 GAsyncReadyCallback callback,
57 gpointer user_data);
58 static gssize spice_vmc_input_stream_read_finish (GInputStream *stream,
59 GAsyncResult *result,
60 GError **error);
61 static gssize spice_vmc_input_stream_skip (GInputStream *stream,
62 gsize count,
63 GCancellable *cancellable,
64 GError **error);
65 static gboolean spice_vmc_input_stream_close (GInputStream *stream,
66 GCancellable *cancellable,
67 GError **error);
68
G_DEFINE_TYPE(SpiceVmcInputStream,spice_vmc_input_stream,G_TYPE_INPUT_STREAM)69 G_DEFINE_TYPE(SpiceVmcInputStream, spice_vmc_input_stream, G_TYPE_INPUT_STREAM)
70
71
72 static void
73 spice_vmc_input_stream_class_init(SpiceVmcInputStreamClass *klass)
74 {
75 GInputStreamClass *istream_class;
76
77 istream_class = G_INPUT_STREAM_CLASS(klass);
78 istream_class->read_fn = spice_vmc_input_stream_read;
79 istream_class->read_async = spice_vmc_input_stream_read_async;
80 istream_class->read_finish = spice_vmc_input_stream_read_finish;
81 istream_class->skip = spice_vmc_input_stream_skip;
82 istream_class->close_fn = spice_vmc_input_stream_close;
83 }
84
85 static void
spice_vmc_input_stream_init(SpiceVmcInputStream * self)86 spice_vmc_input_stream_init(SpiceVmcInputStream *self)
87 {
88 }
89
90 static SpiceVmcInputStream *
spice_vmc_input_stream_new(void)91 spice_vmc_input_stream_new(void)
92 {
93 SpiceVmcInputStream *self;
94
95 self = g_object_new(SPICE_TYPE_VMC_INPUT_STREAM, NULL);
96
97 return self;
98 }
99
100 typedef struct _complete_in_idle_cb_data {
101 GTask *task;
102 gssize pos;
103 } complete_in_idle_cb_data;
104
105 static gboolean
complete_in_idle_cb(gpointer user_data)106 complete_in_idle_cb(gpointer user_data)
107 {
108 complete_in_idle_cb_data *data = user_data;
109
110 g_task_return_int(data->task, data->pos);
111
112 g_object_unref (data->task);
113 g_free (data);
114
115 return FALSE;
116 }
117
118 /* coroutine */
119 /*
120 * Feed a SpiceVmc stream with new data from a coroutine
121 *
122 * The other end will be waiting on read_async() until data is fed
123 * here.
124 */
125 G_GNUC_INTERNAL void
spice_vmc_input_stream_co_data(SpiceVmcInputStream * self,const gpointer d,gsize size)126 spice_vmc_input_stream_co_data(SpiceVmcInputStream *self,
127 const gpointer d, gsize size)
128 {
129 guint8 *data = d;
130
131 g_return_if_fail(SPICE_IS_VMC_INPUT_STREAM(self));
132 g_return_if_fail(self->coroutine == NULL);
133
134 self->coroutine = coroutine_self();
135
136 while (size > 0) {
137 complete_in_idle_cb_data *cb_data;
138
139 SPICE_DEBUG("spicevmc co_data %p", self->task);
140 if (!self->task)
141 coroutine_yield(NULL);
142
143 g_return_if_fail(self->task != NULL);
144
145 gsize min = MIN(self->count, size);
146 memcpy(self->buffer, data, min);
147
148 size -= min;
149 data += min;
150
151 SPICE_DEBUG("spicevmc co_data complete: %" G_GSIZE_FORMAT
152 "/%" G_GSIZE_FORMAT, min, self->count);
153
154 self->pos += min;
155 self->buffer += min;
156
157 if (self->all && min > 0 && self->pos != self->count)
158 continue;
159
160 /* Let's deal with the task complete in idle by ourselves, as GTask
161 * heuristic only makes sense in a non-coroutine case.
162 */
163 cb_data = g_new(complete_in_idle_cb_data , 1);
164 cb_data->task = g_object_ref(self->task);
165 cb_data->pos = self->pos;
166 g_idle_add(complete_in_idle_cb, cb_data);
167
168 g_clear_object(&self->task);
169 }
170
171 self->coroutine = NULL;
172 }
173
174 static void
read_cancelled(GCancellable * cancellable,gpointer user_data)175 read_cancelled(GCancellable *cancellable,
176 gpointer user_data)
177 {
178 SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(user_data);
179
180 SPICE_DEBUG("read cancelled, %p", self->task);
181 g_task_return_new_error(self->task,
182 G_IO_ERROR, G_IO_ERROR_CANCELLED,
183 "read cancelled");
184
185 /* With GTask, we don't need to disconnect GCancellable when task is
186 * cancelled within cancellable callback as it could lead to deadlocks
187 * e.g: https://bugzilla.gnome.org/show_bug.cgi?id=705395 */
188 g_clear_object(&self->task);
189 }
190
191 G_GNUC_INTERNAL void
spice_vmc_input_stream_read_all_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)192 spice_vmc_input_stream_read_all_async(GInputStream *stream,
193 void *buffer,
194 gsize count,
195 int io_priority,
196 GCancellable *cancellable,
197 GAsyncReadyCallback callback,
198 gpointer user_data)
199 {
200 SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream);
201 GTask *task;
202
203 /* no concurrent read permitted by ginputstream */
204 g_return_if_fail(self->task == NULL);
205 self->all = TRUE;
206 self->buffer = buffer;
207 self->count = count;
208 self->pos = 0;
209 task = g_task_new(self,
210 cancellable,
211 callback,
212 user_data);
213 self->task = task;
214 if (cancellable)
215 self->cancel_id =
216 g_cancellable_connect(cancellable, G_CALLBACK(read_cancelled), self, NULL);
217
218 if (self->coroutine)
219 coroutine_yieldto(self->coroutine, NULL);
220 }
221
222 G_GNUC_INTERNAL gssize
spice_vmc_input_stream_read_all_finish(GInputStream * stream,GAsyncResult * result,GError ** error)223 spice_vmc_input_stream_read_all_finish(GInputStream *stream,
224 GAsyncResult *result,
225 GError **error)
226 {
227 GTask *task = G_TASK(result);
228 SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream);
229 GCancellable *cancel;
230
231 g_return_val_if_fail(g_task_is_valid(task, self), -1);
232 cancel = g_task_get_cancellable(task);
233 if (!g_cancellable_is_cancelled(cancel)) {
234 g_cancellable_disconnect(cancel, self->cancel_id);
235 self->cancel_id = 0;
236 }
237 return g_task_propagate_int(task, error);
238 }
239
240 static void
spice_vmc_input_stream_read_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)241 spice_vmc_input_stream_read_async(GInputStream *stream,
242 void *buffer,
243 gsize count,
244 int io_priority,
245 GCancellable *cancellable,
246 GAsyncReadyCallback callback,
247 gpointer user_data)
248 {
249 SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream);
250 GTask *task;
251
252 /* no concurrent read permitted by ginputstream */
253 g_return_if_fail(self->task == NULL);
254 self->all = FALSE;
255 self->buffer = buffer;
256 self->count = count;
257 self->pos = 0;
258
259 task = g_task_new(self, cancellable, callback, user_data);
260 self->task = task;
261 if (cancellable)
262 self->cancel_id =
263 g_cancellable_connect(cancellable, G_CALLBACK(read_cancelled), self, NULL);
264
265 if (self->coroutine)
266 coroutine_yieldto(self->coroutine, NULL);
267 }
268
269 static gssize
spice_vmc_input_stream_read_finish(GInputStream * stream,GAsyncResult * result,GError ** error)270 spice_vmc_input_stream_read_finish(GInputStream *stream,
271 GAsyncResult *result,
272 GError **error)
273 {
274 GTask *task = G_TASK(result);
275 SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream);
276 GCancellable *cancel;
277
278 g_return_val_if_fail(g_task_is_valid(task, self), -1);
279
280 cancel = g_task_get_cancellable(task);
281 if (!g_cancellable_is_cancelled(cancel)) {
282 g_cancellable_disconnect(cancel, self->cancel_id);
283 self->cancel_id = 0;
284 }
285 return g_task_propagate_int(task, error);
286 }
287
288 static gssize
spice_vmc_input_stream_read(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)289 spice_vmc_input_stream_read(GInputStream *stream,
290 void *buffer,
291 gsize count,
292 GCancellable *cancellable,
293 GError **error)
294 {
295 g_return_val_if_reached(-1);
296 }
297
298 static gssize
spice_vmc_input_stream_skip(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)299 spice_vmc_input_stream_skip(GInputStream *stream,
300 gsize count,
301 GCancellable *cancellable,
302 GError **error)
303 {
304 g_return_val_if_reached(-1);
305 }
306
307 static gboolean
spice_vmc_input_stream_close(GInputStream * stream,GCancellable * cancellable,GError ** error)308 spice_vmc_input_stream_close(GInputStream *stream,
309 GCancellable *cancellable,
310 GError **error)
311 {
312 SPICE_DEBUG("fake close");
313 return TRUE;
314 }
315
316 /* OUTPUT */
317
318 struct _SpiceVmcOutputStream
319 {
320 GOutputStream parent_instance;
321
322 SpiceChannel *channel; /* weak */
323 };
324
325 struct _SpiceVmcOutputStreamClass
326 {
327 GOutputStreamClass parent_class;
328 };
329
330 static gssize spice_vmc_output_stream_write_fn (GOutputStream *stream,
331 const void *buffer,
332 gsize count,
333 GCancellable *cancellable,
334 GError **error);
335 static gssize spice_vmc_output_stream_write_finish (GOutputStream *stream,
336 GAsyncResult *result,
337 GError **error);
338 static void spice_vmc_output_stream_write_async (GOutputStream *stream,
339 const void *buffer,
340 gsize count,
341 int io_priority,
342 GCancellable *cancellable,
343 GAsyncReadyCallback callback,
344 gpointer user_data);
345
G_DEFINE_TYPE(SpiceVmcOutputStream,spice_vmc_output_stream,G_TYPE_OUTPUT_STREAM)346 G_DEFINE_TYPE(SpiceVmcOutputStream, spice_vmc_output_stream, G_TYPE_OUTPUT_STREAM)
347
348
349 static void
350 spice_vmc_output_stream_class_init(SpiceVmcOutputStreamClass *klass)
351 {
352 GOutputStreamClass *ostream_class;
353
354 ostream_class = G_OUTPUT_STREAM_CLASS(klass);
355 ostream_class->write_fn = spice_vmc_output_stream_write_fn;
356 ostream_class->write_async = spice_vmc_output_stream_write_async;
357 ostream_class->write_finish = spice_vmc_output_stream_write_finish;
358 }
359
360 static void
spice_vmc_output_stream_init(SpiceVmcOutputStream * self)361 spice_vmc_output_stream_init(SpiceVmcOutputStream *self)
362 {
363 }
364
365 static SpiceVmcOutputStream *
spice_vmc_output_stream_new(SpiceChannel * channel)366 spice_vmc_output_stream_new(SpiceChannel *channel)
367 {
368 SpiceVmcOutputStream *self;
369
370 self = g_object_new(SPICE_TYPE_VMC_OUTPUT_STREAM, NULL);
371 self->channel = channel;
372
373 return self;
374 }
375
376 static gssize
spice_vmc_output_stream_write_fn(GOutputStream * stream,const void * buffer,gsize count,GCancellable * cancellable,GError ** error)377 spice_vmc_output_stream_write_fn(GOutputStream *stream,
378 const void *buffer,
379 gsize count,
380 GCancellable *cancellable,
381 GError **error)
382 {
383 SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream);
384 SpiceMsgOut *msg_out;
385
386 msg_out = spice_msg_out_new(SPICE_CHANNEL(self->channel),
387 SPICE_MSGC_SPICEVMC_DATA);
388 spice_marshaller_add(msg_out->marshaller, buffer, count);
389 spice_msg_out_send(msg_out);
390
391 return count;
392 }
393
394 static gssize
spice_vmc_output_stream_write_finish(GOutputStream * stream,GAsyncResult * simple,GError ** error)395 spice_vmc_output_stream_write_finish(GOutputStream *stream,
396 GAsyncResult *simple,
397 GError **error)
398 {
399 SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream);
400 GAsyncResult *res = g_task_propagate_pointer(G_TASK(simple), error);
401 gssize bytes_written;
402
403 SPICE_DEBUG("spicevmc write finish");
404 bytes_written = spice_vmc_write_finish(self->channel, res, error);
405 g_object_unref(res);
406
407 return bytes_written;
408 }
409
410 static void
write_cb(GObject * source_object,GAsyncResult * res,gpointer user_data)411 write_cb(GObject *source_object,
412 GAsyncResult *res,
413 gpointer user_data)
414 {
415 GTask *task = user_data;
416
417 g_task_return_pointer(task, g_object_ref(res), g_object_unref);
418
419 g_object_unref(task);
420 }
421
422 static void
spice_vmc_output_stream_write_async(GOutputStream * stream,const void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)423 spice_vmc_output_stream_write_async(GOutputStream *stream,
424 const void *buffer,
425 gsize count,
426 int io_priority,
427 GCancellable *cancellable,
428 GAsyncReadyCallback callback,
429 gpointer user_data)
430 {
431 SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream);
432 GTask *task;
433
434 SPICE_DEBUG("spicevmc write async");
435 /* an AsyncResult to forward async op to channel */
436 task = g_task_new(self, cancellable, callback, user_data);
437
438 spice_vmc_write_async(self->channel, buffer, count,
439 cancellable, write_cb,
440 task);
441 }
442
443 /* STREAM */
444
445 struct _SpiceVmcStream
446 {
447 GIOStream parent_instance;
448
449 SpiceChannel *channel; /* weak */
450 SpiceVmcInputStream *in;
451 SpiceVmcOutputStream *out;
452 };
453
454 struct _SpiceVmcStreamClass
455 {
456 GIOStreamClass parent_class;
457 };
458
459 static void spice_vmc_stream_finalize (GObject *object);
460 static GInputStream * spice_vmc_stream_get_input_stream (GIOStream *stream);
461 static GOutputStream * spice_vmc_stream_get_output_stream (GIOStream *stream);
462
G_DEFINE_TYPE(SpiceVmcStream,spice_vmc_stream,G_TYPE_IO_STREAM)463 G_DEFINE_TYPE(SpiceVmcStream, spice_vmc_stream, G_TYPE_IO_STREAM)
464
465 static void
466 spice_vmc_stream_class_init(SpiceVmcStreamClass *klass)
467 {
468 GObjectClass *object_class;
469 GIOStreamClass *iostream_class;
470
471 object_class = G_OBJECT_CLASS(klass);
472 object_class->finalize = spice_vmc_stream_finalize;
473
474 iostream_class = G_IO_STREAM_CLASS(klass);
475 iostream_class->get_input_stream = spice_vmc_stream_get_input_stream;
476 iostream_class->get_output_stream = spice_vmc_stream_get_output_stream;
477 }
478
479 static void
spice_vmc_stream_finalize(GObject * object)480 spice_vmc_stream_finalize(GObject *object)
481 {
482 SpiceVmcStream *self = SPICE_VMC_STREAM(object);
483
484 g_clear_object(&self->in);
485 g_clear_object(&self->out);
486
487 G_OBJECT_CLASS(spice_vmc_stream_parent_class)->finalize(object);
488 }
489
490 static void
spice_vmc_stream_init(SpiceVmcStream * self)491 spice_vmc_stream_init(SpiceVmcStream *self)
492 {
493 }
494
495 G_GNUC_INTERNAL SpiceVmcStream *
spice_vmc_stream_new(SpiceChannel * channel)496 spice_vmc_stream_new(SpiceChannel *channel)
497 {
498 SpiceVmcStream *self;
499
500 self = g_object_new(SPICE_TYPE_VMC_STREAM, NULL);
501 self->channel = channel;
502
503 return self;
504 }
505
506 static GInputStream *
spice_vmc_stream_get_input_stream(GIOStream * stream)507 spice_vmc_stream_get_input_stream(GIOStream *stream)
508 {
509 SpiceVmcStream *self = SPICE_VMC_STREAM(stream);
510
511 if (!self->in)
512 self->in = spice_vmc_input_stream_new();
513
514 return G_INPUT_STREAM(self->in);
515 }
516
517 static GOutputStream *
spice_vmc_stream_get_output_stream(GIOStream * stream)518 spice_vmc_stream_get_output_stream(GIOStream *stream)
519 {
520 SpiceVmcStream *self = SPICE_VMC_STREAM(stream);
521
522 if (!self->out)
523 self->out = spice_vmc_output_stream_new(self->channel);
524
525 return G_OUTPUT_STREAM(self->out);
526 }
527