1 /* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3   Copyright (C) 2013 Red Hat, Inc.
4 
5   This library is free software; you can redistribute it and/or
6   modify it under the terms of the GNU Lesser General Public
7   License as published by the Free Software Foundation; either
8   version 2.1 of the License, or (at your option) any later version.
9 
10   This library is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   Lesser General Public License for more details.
14 
15   You should have received a copy of the GNU Lesser General Public
16   License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "config.h"
19 
20 #include <string.h>
21 
22 #include "vmcstream.h"
23 #include "spice-channel-priv.h"
24 #include "gio-coroutine.h"
25 
26 struct _SpiceVmcInputStream
27 {
28     GInputStream parent_instance;
29     GTask *task;
30     struct coroutine *coroutine;
31 
32     SpiceChannel *channel;
33     gboolean all;
34     guint8 *buffer;
35     gsize count;
36     gsize pos;
37 
38     gulong cancel_id;
39 };
40 
41 struct _SpiceVmcInputStreamClass
42 {
43     GInputStreamClass parent_class;
44 };
45 
46 static gssize   spice_vmc_input_stream_read        (GInputStream        *stream,
47                                                     void                *buffer,
48                                                     gsize                count,
49                                                     GCancellable        *cancellable,
50                                                     GError             **error);
51 static void     spice_vmc_input_stream_read_async  (GInputStream        *stream,
52                                                     void                *buffer,
53                                                     gsize                count,
54                                                     int                  io_priority,
55                                                     GCancellable        *cancellable,
56                                                     GAsyncReadyCallback  callback,
57                                                     gpointer             user_data);
58 static gssize   spice_vmc_input_stream_read_finish (GInputStream        *stream,
59                                                     GAsyncResult        *result,
60                                                     GError             **error);
61 static gssize   spice_vmc_input_stream_skip        (GInputStream        *stream,
62                                                     gsize                count,
63                                                     GCancellable        *cancellable,
64                                                     GError             **error);
65 static gboolean spice_vmc_input_stream_close       (GInputStream        *stream,
66                                                     GCancellable        *cancellable,
67                                                     GError             **error);
68 
G_DEFINE_TYPE(SpiceVmcInputStream,spice_vmc_input_stream,G_TYPE_INPUT_STREAM)69 G_DEFINE_TYPE(SpiceVmcInputStream, spice_vmc_input_stream, G_TYPE_INPUT_STREAM)
70 
71 
72 static void
73 spice_vmc_input_stream_class_init(SpiceVmcInputStreamClass *klass)
74 {
75     GInputStreamClass *istream_class;
76 
77     istream_class = G_INPUT_STREAM_CLASS(klass);
78     istream_class->read_fn = spice_vmc_input_stream_read;
79     istream_class->read_async = spice_vmc_input_stream_read_async;
80     istream_class->read_finish = spice_vmc_input_stream_read_finish;
81     istream_class->skip = spice_vmc_input_stream_skip;
82     istream_class->close_fn = spice_vmc_input_stream_close;
83 }
84 
85 static void
spice_vmc_input_stream_init(SpiceVmcInputStream * self)86 spice_vmc_input_stream_init(SpiceVmcInputStream *self)
87 {
88 }
89 
90 static SpiceVmcInputStream *
spice_vmc_input_stream_new(void)91 spice_vmc_input_stream_new(void)
92 {
93     SpiceVmcInputStream *self;
94 
95     self = g_object_new(SPICE_TYPE_VMC_INPUT_STREAM, NULL);
96 
97     return self;
98 }
99 
100 typedef struct _complete_in_idle_cb_data {
101     GTask *task;
102     gssize pos;
103 } complete_in_idle_cb_data;
104 
105 static gboolean
complete_in_idle_cb(gpointer user_data)106 complete_in_idle_cb(gpointer user_data)
107 {
108     complete_in_idle_cb_data *data = user_data;
109 
110     g_task_return_int(data->task, data->pos);
111 
112     g_object_unref (data->task);
113     g_free (data);
114 
115     return FALSE;
116 }
117 
118 /* coroutine */
119 /*
120  * Feed a SpiceVmc stream with new data from a coroutine
121  *
122  * The other end will be waiting on read_async() until data is fed
123  * here.
124  */
125 G_GNUC_INTERNAL void
spice_vmc_input_stream_co_data(SpiceVmcInputStream * self,const gpointer d,gsize size)126 spice_vmc_input_stream_co_data(SpiceVmcInputStream *self,
127                                const gpointer d, gsize size)
128 {
129     guint8 *data = d;
130 
131     g_return_if_fail(SPICE_IS_VMC_INPUT_STREAM(self));
132     g_return_if_fail(self->coroutine == NULL);
133 
134     self->coroutine = coroutine_self();
135 
136     while (size > 0) {
137         complete_in_idle_cb_data *cb_data;
138 
139         SPICE_DEBUG("spicevmc co_data %p", self->task);
140         if (!self->task)
141             coroutine_yield(NULL);
142 
143         g_return_if_fail(self->task != NULL);
144 
145         gsize min = MIN(self->count, size);
146         memcpy(self->buffer, data, min);
147 
148         size -= min;
149         data += min;
150 
151         SPICE_DEBUG("spicevmc co_data complete: %" G_GSIZE_FORMAT
152                     "/%" G_GSIZE_FORMAT, min, self->count);
153 
154         self->pos += min;
155         self->buffer += min;
156 
157         if (self->all && min > 0 && self->pos != self->count)
158             continue;
159 
160         /* Let's deal with the task complete in idle by ourselves, as GTask
161          * heuristic only makes sense in a non-coroutine case.
162          */
163         cb_data = g_new(complete_in_idle_cb_data , 1);
164         cb_data->task = g_object_ref(self->task);
165         cb_data->pos = self->pos;
166         g_idle_add(complete_in_idle_cb, cb_data);
167 
168         g_clear_object(&self->task);
169     }
170 
171     self->coroutine = NULL;
172 }
173 
174 static void
read_cancelled(GCancellable * cancellable,gpointer user_data)175 read_cancelled(GCancellable *cancellable,
176                gpointer user_data)
177 {
178     SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(user_data);
179 
180     SPICE_DEBUG("read cancelled, %p", self->task);
181     g_task_return_new_error(self->task,
182                             G_IO_ERROR, G_IO_ERROR_CANCELLED,
183                             "read cancelled");
184 
185     /* With GTask, we don't need to disconnect GCancellable when task is
186      * cancelled within cancellable callback as it could lead to deadlocks
187      * e.g: https://bugzilla.gnome.org/show_bug.cgi?id=705395 */
188     g_clear_object(&self->task);
189 }
190 
191 G_GNUC_INTERNAL void
spice_vmc_input_stream_read_all_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)192 spice_vmc_input_stream_read_all_async(GInputStream        *stream,
193                                       void                *buffer,
194                                       gsize                count,
195                                       int                  io_priority,
196                                       GCancellable        *cancellable,
197                                       GAsyncReadyCallback  callback,
198                                       gpointer             user_data)
199 {
200     SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream);
201     GTask *task;
202 
203     /* no concurrent read permitted by ginputstream */
204     g_return_if_fail(self->task == NULL);
205     self->all = TRUE;
206     self->buffer = buffer;
207     self->count = count;
208     self->pos = 0;
209     task = g_task_new(self,
210                       cancellable,
211                       callback,
212                       user_data);
213     self->task = task;
214     if (cancellable)
215         self->cancel_id =
216             g_cancellable_connect(cancellable, G_CALLBACK(read_cancelled), self, NULL);
217 
218     if (self->coroutine)
219         coroutine_yieldto(self->coroutine, NULL);
220 }
221 
222 G_GNUC_INTERNAL gssize
spice_vmc_input_stream_read_all_finish(GInputStream * stream,GAsyncResult * result,GError ** error)223 spice_vmc_input_stream_read_all_finish(GInputStream *stream,
224                                        GAsyncResult *result,
225                                        GError **error)
226 {
227     GTask *task = G_TASK(result);
228     SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream);
229     GCancellable *cancel;
230 
231     g_return_val_if_fail(g_task_is_valid(task, self), -1);
232     cancel = g_task_get_cancellable(task);
233     if (!g_cancellable_is_cancelled(cancel)) {
234          g_cancellable_disconnect(cancel, self->cancel_id);
235          self->cancel_id = 0;
236     }
237     return g_task_propagate_int(task, error);
238 }
239 
240 static void
spice_vmc_input_stream_read_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)241 spice_vmc_input_stream_read_async(GInputStream        *stream,
242                                   void                *buffer,
243                                   gsize                count,
244                                   int                  io_priority,
245                                   GCancellable        *cancellable,
246                                   GAsyncReadyCallback  callback,
247                                   gpointer             user_data)
248 {
249     SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream);
250     GTask *task;
251 
252     /* no concurrent read permitted by ginputstream */
253     g_return_if_fail(self->task == NULL);
254     self->all = FALSE;
255     self->buffer = buffer;
256     self->count = count;
257     self->pos = 0;
258 
259     task = g_task_new(self, cancellable, callback, user_data);
260     self->task = task;
261     if (cancellable)
262         self->cancel_id =
263             g_cancellable_connect(cancellable, G_CALLBACK(read_cancelled), self, NULL);
264 
265     if (self->coroutine)
266         coroutine_yieldto(self->coroutine, NULL);
267 }
268 
269 static gssize
spice_vmc_input_stream_read_finish(GInputStream * stream,GAsyncResult * result,GError ** error)270 spice_vmc_input_stream_read_finish(GInputStream *stream,
271                                    GAsyncResult *result,
272                                    GError **error)
273 {
274     GTask *task = G_TASK(result);
275     SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream);
276     GCancellable *cancel;
277 
278     g_return_val_if_fail(g_task_is_valid(task, self), -1);
279 
280     cancel = g_task_get_cancellable(task);
281     if (!g_cancellable_is_cancelled(cancel)) {
282          g_cancellable_disconnect(cancel, self->cancel_id);
283          self->cancel_id = 0;
284     }
285     return g_task_propagate_int(task, error);
286 }
287 
288 static gssize
spice_vmc_input_stream_read(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)289 spice_vmc_input_stream_read(GInputStream  *stream,
290                             void          *buffer,
291                             gsize          count,
292                             GCancellable  *cancellable,
293                             GError       **error)
294 {
295     g_return_val_if_reached(-1);
296 }
297 
298 static gssize
spice_vmc_input_stream_skip(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)299 spice_vmc_input_stream_skip(GInputStream  *stream,
300                             gsize          count,
301                             GCancellable  *cancellable,
302                             GError       **error)
303 {
304     g_return_val_if_reached(-1);
305 }
306 
307 static gboolean
spice_vmc_input_stream_close(GInputStream * stream,GCancellable * cancellable,GError ** error)308 spice_vmc_input_stream_close(GInputStream  *stream,
309                              GCancellable  *cancellable,
310                              GError       **error)
311 {
312     SPICE_DEBUG("fake close");
313     return TRUE;
314 }
315 
316 /* OUTPUT */
317 
318 struct _SpiceVmcOutputStream
319 {
320     GOutputStream parent_instance;
321 
322     SpiceChannel *channel; /* weak */
323 };
324 
325 struct _SpiceVmcOutputStreamClass
326 {
327     GOutputStreamClass parent_class;
328 };
329 
330 static gssize   spice_vmc_output_stream_write_fn     (GOutputStream   *stream,
331                                                       const void      *buffer,
332                                                       gsize            count,
333                                                       GCancellable    *cancellable,
334                                                       GError         **error);
335 static gssize   spice_vmc_output_stream_write_finish (GOutputStream   *stream,
336                                                       GAsyncResult    *result,
337                                                       GError         **error);
338 static void     spice_vmc_output_stream_write_async  (GOutputStream   *stream,
339                                                       const void      *buffer,
340                                                       gsize            count,
341                                                       int              io_priority,
342                                                       GCancellable    *cancellable,
343                                                       GAsyncReadyCallback callback,
344                                                       gpointer         user_data);
345 
G_DEFINE_TYPE(SpiceVmcOutputStream,spice_vmc_output_stream,G_TYPE_OUTPUT_STREAM)346 G_DEFINE_TYPE(SpiceVmcOutputStream, spice_vmc_output_stream, G_TYPE_OUTPUT_STREAM)
347 
348 
349 static void
350 spice_vmc_output_stream_class_init(SpiceVmcOutputStreamClass *klass)
351 {
352     GOutputStreamClass *ostream_class;
353 
354     ostream_class = G_OUTPUT_STREAM_CLASS(klass);
355     ostream_class->write_fn = spice_vmc_output_stream_write_fn;
356     ostream_class->write_async = spice_vmc_output_stream_write_async;
357     ostream_class->write_finish = spice_vmc_output_stream_write_finish;
358 }
359 
360 static void
spice_vmc_output_stream_init(SpiceVmcOutputStream * self)361 spice_vmc_output_stream_init(SpiceVmcOutputStream *self)
362 {
363 }
364 
365 static SpiceVmcOutputStream *
spice_vmc_output_stream_new(SpiceChannel * channel)366 spice_vmc_output_stream_new(SpiceChannel *channel)
367 {
368     SpiceVmcOutputStream *self;
369 
370     self = g_object_new(SPICE_TYPE_VMC_OUTPUT_STREAM, NULL);
371     self->channel = channel;
372 
373     return self;
374 }
375 
376 static gssize
spice_vmc_output_stream_write_fn(GOutputStream * stream,const void * buffer,gsize count,GCancellable * cancellable,GError ** error)377 spice_vmc_output_stream_write_fn(GOutputStream   *stream,
378                                  const void      *buffer,
379                                  gsize            count,
380                                  GCancellable    *cancellable,
381                                  GError         **error)
382 {
383     SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream);
384     SpiceMsgOut *msg_out;
385 
386     msg_out = spice_msg_out_new(SPICE_CHANNEL(self->channel),
387                                 SPICE_MSGC_SPICEVMC_DATA);
388     spice_marshaller_add(msg_out->marshaller, buffer, count);
389     spice_msg_out_send(msg_out);
390 
391     return count;
392 }
393 
394 static gssize
spice_vmc_output_stream_write_finish(GOutputStream * stream,GAsyncResult * simple,GError ** error)395 spice_vmc_output_stream_write_finish(GOutputStream *stream,
396                                      GAsyncResult *simple,
397                                      GError **error)
398 {
399     SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream);
400     GAsyncResult *res = g_task_propagate_pointer(G_TASK(simple), error);
401     gssize bytes_written;
402 
403     SPICE_DEBUG("spicevmc write finish");
404     bytes_written = spice_vmc_write_finish(self->channel, res, error);
405     g_object_unref(res);
406 
407     return bytes_written;
408 }
409 
410 static void
write_cb(GObject * source_object,GAsyncResult * res,gpointer user_data)411 write_cb(GObject *source_object,
412          GAsyncResult *res,
413          gpointer user_data)
414 {
415     GTask *task = user_data;
416 
417     g_task_return_pointer(task, g_object_ref(res), g_object_unref);
418 
419     g_object_unref(task);
420 }
421 
422 static void
spice_vmc_output_stream_write_async(GOutputStream * stream,const void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)423 spice_vmc_output_stream_write_async(GOutputStream *stream,
424                                     const void *buffer,
425                                     gsize count,
426                                     int io_priority,
427                                     GCancellable *cancellable,
428                                     GAsyncReadyCallback callback,
429                                     gpointer user_data)
430 {
431     SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream);
432     GTask *task;
433 
434     SPICE_DEBUG("spicevmc write async");
435     /* an AsyncResult to forward async op to channel */
436     task = g_task_new(self, cancellable, callback, user_data);
437 
438     spice_vmc_write_async(self->channel, buffer, count,
439                           cancellable, write_cb,
440                           task);
441 }
442 
443 /* STREAM */
444 
445 struct _SpiceVmcStream
446 {
447     GIOStream parent_instance;
448 
449     SpiceChannel *channel; /* weak */
450     SpiceVmcInputStream *in;
451     SpiceVmcOutputStream *out;
452 };
453 
454 struct _SpiceVmcStreamClass
455 {
456     GIOStreamClass parent_class;
457 };
458 
459 static void            spice_vmc_stream_finalize          (GObject   *object);
460 static GInputStream *  spice_vmc_stream_get_input_stream  (GIOStream *stream);
461 static GOutputStream * spice_vmc_stream_get_output_stream (GIOStream *stream);
462 
G_DEFINE_TYPE(SpiceVmcStream,spice_vmc_stream,G_TYPE_IO_STREAM)463 G_DEFINE_TYPE(SpiceVmcStream, spice_vmc_stream, G_TYPE_IO_STREAM)
464 
465 static void
466 spice_vmc_stream_class_init(SpiceVmcStreamClass *klass)
467 {
468     GObjectClass *object_class;
469     GIOStreamClass *iostream_class;
470 
471     object_class = G_OBJECT_CLASS(klass);
472     object_class->finalize = spice_vmc_stream_finalize;
473 
474     iostream_class = G_IO_STREAM_CLASS(klass);
475     iostream_class->get_input_stream = spice_vmc_stream_get_input_stream;
476     iostream_class->get_output_stream = spice_vmc_stream_get_output_stream;
477 }
478 
479 static void
spice_vmc_stream_finalize(GObject * object)480 spice_vmc_stream_finalize(GObject *object)
481 {
482     SpiceVmcStream *self = SPICE_VMC_STREAM(object);
483 
484     g_clear_object(&self->in);
485     g_clear_object(&self->out);
486 
487     G_OBJECT_CLASS(spice_vmc_stream_parent_class)->finalize(object);
488 }
489 
490 static void
spice_vmc_stream_init(SpiceVmcStream * self)491 spice_vmc_stream_init(SpiceVmcStream *self)
492 {
493 }
494 
495 G_GNUC_INTERNAL SpiceVmcStream *
spice_vmc_stream_new(SpiceChannel * channel)496 spice_vmc_stream_new(SpiceChannel *channel)
497 {
498     SpiceVmcStream *self;
499 
500     self = g_object_new(SPICE_TYPE_VMC_STREAM, NULL);
501     self->channel = channel;
502 
503     return self;
504 }
505 
506 static GInputStream *
spice_vmc_stream_get_input_stream(GIOStream * stream)507 spice_vmc_stream_get_input_stream(GIOStream *stream)
508 {
509     SpiceVmcStream *self = SPICE_VMC_STREAM(stream);
510 
511     if (!self->in)
512         self->in = spice_vmc_input_stream_new();
513 
514     return G_INPUT_STREAM(self->in);
515 }
516 
517 static GOutputStream *
spice_vmc_stream_get_output_stream(GIOStream * stream)518 spice_vmc_stream_get_output_stream(GIOStream *stream)
519 {
520     SpiceVmcStream *self = SPICE_VMC_STREAM(stream);
521 
522     if (!self->out)
523         self->out = spice_vmc_output_stream_new(self->channel);
524 
525     return G_OUTPUT_STREAM(self->out);
526 }
527