1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22 
23 #include <config.h>
24 
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <fcntl.h>
28 #include <unistd.h>
29 #include <errno.h>
30 #include <string.h>
31 #include <sys/socket.h>
32 #include <sys/un.h>
33 
34 #include <glib.h>
35 #include <glib/gstdio.h>
36 #include <glib/gi18n-lib.h>
37 #include <gio/gio.h>
38 #include <gio/gunixinputstream.h>
39 #include <gio/gunixoutputstream.h>
40 #include "gdaemonfileoutputstream.h"
41 #include "gvfsdaemondbus.h"
42 #include <gvfsdaemonprotocol.h>
43 #include <gvfsfileinfo.h>
44 
45 #define MAX_WRITE_SIZE (4*1024*1024)
46 
47 typedef enum {
48   STATE_OP_DONE,
49   STATE_OP_READ,
50   STATE_OP_WRITE,
51   STATE_OP_SKIP
52 } StateOp;
53 
54 typedef enum {
55   WRITE_STATE_INIT = 0,
56   WRITE_STATE_WROTE_COMMAND,
57   WRITE_STATE_SEND_DATA,
58   WRITE_STATE_HANDLE_INPUT
59 } WriteState;
60 
61 typedef struct {
62   WriteState state;
63 
64   /* Output */
65   const char *buffer;
66   gsize buffer_size;
67   gsize buffer_pos;
68 
69   /* Input */
70   gssize ret_val;
71   GError *ret_error;
72 
73   gboolean sent_cancel;
74 
75   guint32 seq_nr;
76 } WriteOperation;
77 
78 typedef enum {
79   SEEK_STATE_INIT = 0,
80   SEEK_STATE_WROTE_REQUEST,
81   SEEK_STATE_HANDLE_INPUT
82 } SeekState;
83 
84 typedef struct {
85   SeekState state;
86 
87   /* Output */
88   goffset offset;
89   GSeekType seek_type;
90   /* Output */
91   gboolean ret_val;
92   GError *ret_error;
93   goffset ret_offset;
94 
95   gboolean sent_cancel;
96 
97   guint32 seq_nr;
98 } SeekOperation;
99 
100 typedef enum {
101   TRUNCATE_STATE_INIT = 0,
102   TRUNCATE_STATE_WROTE_REQUEST,
103   TRUNCATE_STATE_HANDLE_INPUT
104 } TruncateState;
105 
106 typedef struct {
107   TruncateState state;
108 
109   /* Output */
110   goffset size;
111   /* Input */
112   gboolean ret_val;
113   GError *ret_error;
114 
115   gboolean sent_cancel;
116 
117   guint32 seq_nr;
118 } TruncateOperation;
119 
120 typedef enum {
121   CLOSE_STATE_INIT = 0,
122   CLOSE_STATE_WROTE_REQUEST,
123   CLOSE_STATE_HANDLE_INPUT
124 } CloseState;
125 
126 typedef struct {
127   CloseState state;
128 
129   /* Output */
130 
131   /* Output */
132   gboolean ret_val;
133   GError *ret_error;
134 
135   gboolean sent_cancel;
136 
137   guint32 seq_nr;
138 } CloseOperation;
139 
140 typedef enum {
141   QUERY_STATE_INIT = 0,
142   QUERY_STATE_WROTE_REQUEST,
143   QUERY_STATE_HANDLE_INPUT,
144 } QueryState;
145 
146 typedef struct {
147   QueryState state;
148 
149   /* Input */
150   char *attributes;
151 
152   /* Output */
153   GFileInfo *info;
154   GError *ret_error;
155 
156   gboolean sent_cancel;
157 
158   guint32 seq_nr;
159 } QueryOperation;
160 
161 typedef struct {
162   gboolean cancelled;
163 
164   char *io_buffer;
165   gsize io_size;
166   gsize io_res;
167   /* The operation always succeeds, or gets cancelled.
168      If we get an error doing the i/o that is considered fatal */
169   gboolean io_allow_cancel;
170   gboolean io_cancelled;
171 } IOOperationData;
172 
173 typedef StateOp (*state_machine_iterator) (GDaemonFileOutputStream *file, IOOperationData *io_op, gpointer data);
174 
175 struct _GDaemonFileOutputStream {
176   GFileOutputStream parent;
177 
178   GOutputStream *command_stream;
179   GInputStream *data_stream;
180   gboolean can_seek;
181   gboolean can_truncate;
182 
183   guint32 seq_nr;
184   goffset current_offset;
185 
186   gsize input_block_size;
187   GString *input_buffer;
188 
189   GString *output_buffer;
190 
191   char *etag;
192 
193 };
194 
195 static gssize     g_daemon_file_output_stream_write             (GOutputStream        *stream,
196 								 const void           *buffer,
197 								 gsize                 count,
198 								 GCancellable         *cancellable,
199 								 GError              **error);
200 static gboolean   g_daemon_file_output_stream_close             (GOutputStream        *stream,
201 								 GCancellable         *cancellable,
202 								 GError              **error);
203 static GFileInfo *g_daemon_file_output_stream_query_info        (GFileOutputStream    *stream,
204 								 const char           *attributes,
205 								 GCancellable         *cancellable,
206 								 GError              **error);
207 static char      *g_daemon_file_output_stream_get_etag          (GFileOutputStream    *stream);
208 static goffset    g_daemon_file_output_stream_tell              (GFileOutputStream    *stream);
209 static gboolean   g_daemon_file_output_stream_can_seek          (GFileOutputStream    *stream);
210 static gboolean   g_daemon_file_output_stream_seek              (GFileOutputStream    *stream,
211 								 goffset               offset,
212 								 GSeekType             type,
213 								 GCancellable         *cancellable,
214 								 GError              **error);
215 static gboolean   g_daemon_file_output_stream_can_truncate      (GFileOutputStream    *stream);
216 static gboolean   g_daemon_file_output_stream_truncate          (GFileOutputStream    *stream,
217 								 goffset               size,
218 								 GCancellable         *cancellable,
219 								 GError              **error);
220 static void       g_daemon_file_output_stream_write_async       (GOutputStream        *stream,
221 								 const void           *buffer,
222 								 gsize                 count,
223 								 int                   io_priority,
224 								 GCancellable         *cancellable,
225 								 GAsyncReadyCallback   callback,
226 								 gpointer              data);
227 static gssize     g_daemon_file_output_stream_write_finish      (GOutputStream        *stream,
228 								 GAsyncResult         *result,
229 								 GError              **error);
230 static void       g_daemon_file_output_stream_close_async       (GOutputStream        *stream,
231 								 int                   io_priority,
232 								 GCancellable         *cancellable,
233 								 GAsyncReadyCallback   callback,
234 								 gpointer              data);
235 static gboolean   g_daemon_file_output_stream_close_finish      (GOutputStream        *stream,
236 								 GAsyncResult         *result,
237 								 GError              **error);
238 static void       g_daemon_file_output_stream_query_info_async  (GFileOutputStream    *stream,
239 								 const char           *attributes,
240 								 int                   io_priority,
241 								 GCancellable         *cancellable,
242 								 GAsyncReadyCallback   callback,
243 								 gpointer              user_data);
244 static GFileInfo *g_daemon_file_output_stream_query_info_finish (GFileOutputStream    *stream,
245 								 GAsyncResult         *result,
246 								 GError              **error);
247 
248 
249 
G_DEFINE_TYPE(GDaemonFileOutputStream,g_daemon_file_output_stream,G_TYPE_FILE_OUTPUT_STREAM)250 G_DEFINE_TYPE (GDaemonFileOutputStream, g_daemon_file_output_stream,
251 	       G_TYPE_FILE_OUTPUT_STREAM)
252 
253 static void
254 query_operation_free (QueryOperation *op)
255 {
256   g_free (op->attributes);
257   g_free (op);
258 }
259 
260 static void
g_string_remove_in_front(GString * string,gsize bytes)261 g_string_remove_in_front (GString *string,
262 			  gsize bytes)
263 {
264   memmove (string->str,
265 	   string->str + bytes,
266 	   string->len - bytes);
267   g_string_truncate (string,
268 		     string->len - bytes);
269 }
270 
271 static void
g_daemon_file_output_stream_finalize(GObject * object)272 g_daemon_file_output_stream_finalize (GObject *object)
273 {
274   GDaemonFileOutputStream *file;
275 
276   file = G_DAEMON_FILE_OUTPUT_STREAM (object);
277 
278   if (file->command_stream)
279     g_object_unref (file->command_stream);
280   if (file->data_stream)
281     g_object_unref (file->data_stream);
282 
283   g_string_free (file->input_buffer, TRUE);
284   g_string_free (file->output_buffer, TRUE);
285 
286   g_free (file->etag);
287 
288   if (G_OBJECT_CLASS (g_daemon_file_output_stream_parent_class)->finalize)
289     (*G_OBJECT_CLASS (g_daemon_file_output_stream_parent_class)->finalize) (object);
290 }
291 
292 static void
g_daemon_file_output_stream_class_init(GDaemonFileOutputStreamClass * klass)293 g_daemon_file_output_stream_class_init (GDaemonFileOutputStreamClass *klass)
294 {
295   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
296   GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass);
297   GFileOutputStreamClass *file_stream_class = G_FILE_OUTPUT_STREAM_CLASS (klass);
298 
299   gobject_class->finalize = g_daemon_file_output_stream_finalize;
300 
301   stream_class->write_fn = g_daemon_file_output_stream_write;
302   stream_class->close_fn = g_daemon_file_output_stream_close;
303 
304   stream_class->write_async = g_daemon_file_output_stream_write_async;
305   stream_class->write_finish = g_daemon_file_output_stream_write_finish;
306   stream_class->close_async = g_daemon_file_output_stream_close_async;
307   stream_class->close_finish = g_daemon_file_output_stream_close_finish;
308 
309   file_stream_class->tell = g_daemon_file_output_stream_tell;
310   file_stream_class->can_seek = g_daemon_file_output_stream_can_seek;
311   file_stream_class->seek = g_daemon_file_output_stream_seek;
312   file_stream_class->can_truncate = g_daemon_file_output_stream_can_truncate;
313   file_stream_class->truncate_fn = g_daemon_file_output_stream_truncate;
314   file_stream_class->query_info = g_daemon_file_output_stream_query_info;
315   file_stream_class->get_etag = g_daemon_file_output_stream_get_etag;
316   file_stream_class->query_info_async = g_daemon_file_output_stream_query_info_async;
317   file_stream_class->query_info_finish = g_daemon_file_output_stream_query_info_finish;
318 }
319 
320 static void
g_daemon_file_output_stream_init(GDaemonFileOutputStream * info)321 g_daemon_file_output_stream_init (GDaemonFileOutputStream *info)
322 {
323   info->output_buffer = g_string_new ("");
324   info->input_buffer = g_string_new ("");
325   info->seq_nr = 1;
326 }
327 
328 GFileOutputStream *
g_daemon_file_output_stream_new(int fd,guint32 flags,goffset initial_offset)329 g_daemon_file_output_stream_new (int fd,
330 				 guint32 flags,
331 				 goffset initial_offset)
332 {
333   GDaemonFileOutputStream *stream;
334 
335   stream = g_object_new (G_TYPE_DAEMON_FILE_OUTPUT_STREAM, NULL);
336 
337   stream->command_stream = g_unix_output_stream_new (fd, FALSE);
338   stream->data_stream = g_unix_input_stream_new (fd, TRUE);
339   stream->can_seek = flags & OPEN_FOR_WRITE_FLAG_CAN_SEEK;
340   stream->can_truncate = flags & OPEN_FOR_WRITE_FLAG_CAN_TRUNCATE;
341   stream->current_offset = initial_offset;
342 
343   return G_FILE_OUTPUT_STREAM (stream);
344 }
345 
346 static gboolean
error_is_cancel(GError * error)347 error_is_cancel (GError *error)
348 {
349   return error != NULL &&
350     error->domain == G_IO_ERROR &&
351     error->code == G_IO_ERROR_CANCELLED;
352 }
353 
354 static void
unappend_request(GDaemonFileOutputStream * stream)355 unappend_request (GDaemonFileOutputStream *stream)
356 {
357   g_assert (stream->output_buffer->len >= G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE);
358   stream->seq_nr--;
359   g_string_truncate (stream->output_buffer,
360 		     stream->output_buffer->len - G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE);
361 }
362 
363 static void
append_request(GDaemonFileOutputStream * stream,guint32 command,guint32 arg1,guint32 arg2,guint32 data_len,guint32 * seq_nr)364 append_request (GDaemonFileOutputStream *stream, guint32 command,
365 		guint32 arg1, guint32 arg2, guint32 data_len, guint32 *seq_nr)
366 {
367   GVfsDaemonSocketProtocolRequest cmd;
368 
369   g_assert (sizeof (cmd) == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE);
370 
371   if (seq_nr)
372     *seq_nr = stream->seq_nr;
373 
374   cmd.command = g_htonl (command);
375   cmd.seq_nr = g_htonl (stream->seq_nr);
376   cmd.arg1 = g_htonl (arg1);
377   cmd.arg2 = g_htonl (arg2);
378   cmd.data_len = g_htonl (data_len);
379 
380   stream->seq_nr++;
381 
382   g_string_append_len (stream->output_buffer,
383 		       (char *)&cmd, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE);
384 }
385 
386 static gsize
get_reply_header_missing_bytes(GString * buffer)387 get_reply_header_missing_bytes (GString *buffer)
388 {
389   GVfsDaemonSocketProtocolReply *reply;
390   guint32 type;
391   guint32 arg2;
392 
393   if (buffer->len < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE)
394     return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - buffer->len;
395 
396   reply = (GVfsDaemonSocketProtocolReply *)buffer->str;
397 
398   type = g_ntohl (reply->type);
399   arg2 = g_ntohl (reply->arg2);
400 
401   /* ERROR, CLOSED and INFO has extra data w/ len in arg2 */
402   if (type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR ||
403       type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED ||
404       type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_INFO)
405     return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE + arg2 - buffer->len;
406   return 0;
407 }
408 
409 static char *
decode_reply(GString * buffer,GVfsDaemonSocketProtocolReply * reply_out)410 decode_reply (GString *buffer, GVfsDaemonSocketProtocolReply *reply_out)
411 {
412   GVfsDaemonSocketProtocolReply *reply;
413   reply = (GVfsDaemonSocketProtocolReply *)buffer->str;
414   reply_out->type = g_ntohl (reply->type);
415   reply_out->seq_nr = g_ntohl (reply->seq_nr);
416   reply_out->arg1 = g_ntohl (reply->arg1);
417   reply_out->arg2 = g_ntohl (reply->arg2);
418 
419   return buffer->str + G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE;
420 }
421 
422 static void
decode_error(GVfsDaemonSocketProtocolReply * reply,char * data,GError ** error)423 decode_error (GVfsDaemonSocketProtocolReply *reply, char *data, GError **error)
424 {
425   g_set_error_literal (error,
426 		       g_quark_from_string (data),
427 		       reply->arg1,
428 		       data + strlen (data) + 1);
429 }
430 
431 
432 static gboolean
run_sync_state_machine(GDaemonFileOutputStream * file,state_machine_iterator iterator,gpointer data,GCancellable * cancellable,GError ** error)433 run_sync_state_machine (GDaemonFileOutputStream *file,
434 			state_machine_iterator iterator,
435 			gpointer data,
436 			GCancellable *cancellable,
437 			GError **error)
438 {
439   gssize res;
440   StateOp io_op;
441   IOOperationData io_data;
442   GError *io_error;
443 
444   memset (&io_data, 0, sizeof (io_data));
445 
446   while (TRUE)
447     {
448       if (cancellable)
449 	io_data.cancelled = g_cancellable_is_cancelled (cancellable);
450 
451       io_op = iterator (file, &io_data, data);
452 
453       if (io_op == STATE_OP_DONE)
454 	return TRUE;
455 
456       io_error = NULL;
457       if (io_op == STATE_OP_READ)
458 	{
459 	  res = g_input_stream_read (file->data_stream,
460 				     io_data.io_buffer, io_data.io_size,
461 				     io_data.io_allow_cancel ? cancellable : NULL,
462 				     &io_error);
463 	}
464       else if (io_op == STATE_OP_SKIP)
465 	{
466 	  res = g_input_stream_skip (file->data_stream,
467 				     io_data.io_size,
468 				     io_data.io_allow_cancel ? cancellable : NULL,
469 				     &io_error);
470 	}
471       else if (io_op == STATE_OP_WRITE)
472 	{
473 	  res = g_output_stream_write (file->command_stream,
474 				       io_data.io_buffer, io_data.io_size,
475 				       io_data.io_allow_cancel ? cancellable : NULL,
476 				       &io_error);
477 	}
478       else
479 	{
480 	  res = 0;
481 	  g_assert_not_reached ();
482 	}
483 
484       if (res == -1)
485 	{
486 	  if (error_is_cancel (io_error))
487 	    {
488 	      io_data.io_res = 0;
489 	      io_data.io_cancelled = TRUE;
490 	      g_error_free (io_error);
491 	    }
492 	  else
493 	    {
494 	      g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
495 			   _("Error in stream protocol: %s"), io_error->message);
496 	      g_error_free (io_error);
497 	      return FALSE;
498 	    }
499 	}
500       else if (res == 0 && io_data.io_size != 0)
501 	{
502 	  g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
503 		       _("Error in stream protocol: %s"), _("End of stream"));
504 	  return FALSE;
505 	}
506       else
507 	{
508 	  io_data.io_res = res;
509 	  io_data.io_cancelled = FALSE;
510 	}
511     }
512 }
513 
514 /* read cycle:
515 
516    if we know of a (partially read) matching outstanding block, read from it
517    create packet, append to outgoing
518    flush outgoing
519    start processing output, looking for a data block with same seek gen,
520     or an error with same seq nr
521    on cancel, send cancel command and go back to loop
522  */
523 
524 static StateOp
iterate_write_state_machine(GDaemonFileOutputStream * file,IOOperationData * io_op,WriteOperation * op)525 iterate_write_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, WriteOperation *op)
526 {
527   gsize len;
528 
529   while (TRUE)
530     {
531       switch (op->state)
532 	{
533 	  /* Initial state for read op */
534 	case WRITE_STATE_INIT:
535 	  append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_WRITE,
536 			  op->buffer_size, 0, op->buffer_size, &op->seq_nr);
537 	  op->state = WRITE_STATE_WROTE_COMMAND;
538 	  io_op->io_buffer = file->output_buffer->str;
539 	  io_op->io_size = file->output_buffer->len;
540 	  io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
541 	  return STATE_OP_WRITE;
542 
543 	  /* wrote parts of output_buffer */
544 	case WRITE_STATE_WROTE_COMMAND:
545 	  if (io_op->io_cancelled)
546 	    {
547 	      if (!op->sent_cancel)
548 		unappend_request (file);
549 	      op->ret_val = -1;
550 	      g_set_error_literal (&op->ret_error,
551 				   G_IO_ERROR,
552 				   G_IO_ERROR_CANCELLED,
553                         	   _("Operation was cancelled"));
554 	      return STATE_OP_DONE;
555 	    }
556 
557 	  if (io_op->io_res < file->output_buffer->len)
558 	    {
559 	      g_string_remove_in_front (file->output_buffer,
560 					io_op->io_res);
561 	      io_op->io_buffer = file->output_buffer->str;
562 	      io_op->io_size = file->output_buffer->len;
563 	      io_op->io_allow_cancel = FALSE;
564 	      return STATE_OP_WRITE;
565 	    }
566 	  g_string_truncate (file->output_buffer, 0);
567 
568 	  op->buffer_pos = 0;
569 	  if (op->sent_cancel)
570 	    op->state = WRITE_STATE_HANDLE_INPUT;
571 	  else
572 	    op->state = WRITE_STATE_SEND_DATA;
573 	  break;
574 
575 	  /* No op */
576 	case WRITE_STATE_SEND_DATA:
577 	  op->buffer_pos += io_op->io_res;
578 
579 	  if (op->buffer_pos < op->buffer_size)
580 	    {
581 	      io_op->io_buffer = (char *)(op->buffer + op->buffer_pos);
582 	      io_op->io_size = op->buffer_size - op->buffer_pos;
583 	      io_op->io_allow_cancel = FALSE;
584 	      return STATE_OP_WRITE;
585 	    }
586 
587 	  op->state = WRITE_STATE_HANDLE_INPUT;
588 	  break;
589 
590 	  /* No op */
591 	case WRITE_STATE_HANDLE_INPUT:
592 	  if (io_op->cancelled && !op->sent_cancel)
593 	    {
594 	      op->sent_cancel = TRUE;
595 	      append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
596 			      op->seq_nr, 0, 0, NULL);
597 	      op->state = WRITE_STATE_WROTE_COMMAND;
598 	      io_op->io_buffer = file->output_buffer->str;
599 	      io_op->io_size = file->output_buffer->len;
600 	      io_op->io_allow_cancel = FALSE;
601 	      return STATE_OP_WRITE;
602 	    }
603 
604 	  if (io_op->io_res > 0)
605 	    {
606 	      gsize unread_size = io_op->io_size - io_op->io_res;
607 	      g_string_set_size (file->input_buffer,
608 				 file->input_buffer->len - unread_size);
609 	    }
610 
611 	  len = get_reply_header_missing_bytes (file->input_buffer);
612 	  if (len > 0)
613 	    {
614 	      gsize current_len = file->input_buffer->len;
615 	      g_string_set_size (file->input_buffer,
616 				 current_len + len);
617 	      io_op->io_buffer = file->input_buffer->str + current_len;
618 	      io_op->io_size = len;
619 	      io_op->io_allow_cancel = !op->sent_cancel;
620 	      return STATE_OP_READ;
621 	    }
622 
623 	  /* Got full header */
624 
625 	  {
626 	    GVfsDaemonSocketProtocolReply reply;
627 	    char *data;
628 	    data = decode_reply (file->input_buffer, &reply);
629 
630 	    if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
631 		reply.seq_nr == op->seq_nr)
632 	      {
633 		op->ret_val = -1;
634 		decode_error (&reply, data, &op->ret_error);
635 		g_string_truncate (file->input_buffer, 0);
636 		return STATE_OP_DONE;
637 	      }
638 	    else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_WRITTEN &&
639 		     reply.seq_nr == op->seq_nr)
640 	      {
641 		op->ret_val = reply.arg1;
642 		g_string_truncate (file->input_buffer, 0);
643 		return STATE_OP_DONE;
644 	      }
645 	    /* Ignore other reply types */
646 	  }
647 
648 	  g_string_truncate (file->input_buffer, 0);
649 
650 	  /* This wasn't interesting, read next reply */
651 	  op->state = WRITE_STATE_HANDLE_INPUT;
652 	  break;
653 
654 	default:
655 	  g_assert_not_reached ();
656 	}
657 
658       /* Clear io_op between non-op state switches */
659       io_op->io_size = 0;
660       io_op->io_res = 0;
661       io_op->io_cancelled = FALSE;
662 
663     }
664 }
665 
666 static gssize
g_daemon_file_output_stream_write(GOutputStream * stream,const void * buffer,gsize count,GCancellable * cancellable,GError ** error)667 g_daemon_file_output_stream_write (GOutputStream *stream,
668 				   const void   *buffer,
669 				   gsize         count,
670 				   GCancellable *cancellable,
671 				   GError      **error)
672 {
673   GDaemonFileOutputStream *file;
674   WriteOperation op;
675 
676   file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
677 
678   if (g_cancellable_set_error_if_cancelled (cancellable, error))
679     return -1;
680 
681   /* Limit for sanity and to avoid 32bit overflow */
682   if (count > MAX_WRITE_SIZE)
683     count = MAX_WRITE_SIZE;
684 
685   memset (&op, 0, sizeof (op));
686   op.state = WRITE_STATE_INIT;
687   op.buffer = buffer;
688   op.buffer_size = count;
689 
690   if (!run_sync_state_machine (file, (state_machine_iterator)iterate_write_state_machine,
691 			       &op, cancellable, error))
692     return -1; /* IO Error */
693 
694   if (op.ret_val == -1)
695     g_propagate_error (error, op.ret_error);
696   else
697     file->current_offset += op.ret_val;
698 
699   return op.ret_val;
700 }
701 
702 static StateOp
iterate_close_state_machine(GDaemonFileOutputStream * file,IOOperationData * io_op,CloseOperation * op)703 iterate_close_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, CloseOperation *op)
704 {
705   gsize len;
706 
707   while (TRUE)
708     {
709       switch (op->state)
710 	{
711 	  /* Initial state for read op */
712 	case CLOSE_STATE_INIT:
713 	  append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE,
714 			  0, 0, 0, &op->seq_nr);
715 	  op->state = CLOSE_STATE_WROTE_REQUEST;
716 	  io_op->io_buffer = file->output_buffer->str;
717 	  io_op->io_size = file->output_buffer->len;
718 	  io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
719 	  return STATE_OP_WRITE;
720 
721 	  /* wrote parts of output_buffer */
722 	case CLOSE_STATE_WROTE_REQUEST:
723 	  if (io_op->io_cancelled)
724 	    {
725 	      if (!op->sent_cancel)
726 		unappend_request (file);
727 	      op->ret_val = FALSE;
728 	      g_set_error_literal (&op->ret_error,
729 				   G_IO_ERROR,
730 				   G_IO_ERROR_CANCELLED,
731 				   _("Operation was cancelled"));
732 	      return STATE_OP_DONE;
733 	    }
734 
735 	  if (io_op->io_res < file->output_buffer->len)
736 	    {
737 	      g_string_remove_in_front (file->output_buffer,
738 					io_op->io_res);
739 	      io_op->io_buffer = file->output_buffer->str;
740 	      io_op->io_size = file->output_buffer->len;
741 	      io_op->io_allow_cancel = FALSE;
742 	      return STATE_OP_WRITE;
743 	    }
744 	  g_string_truncate (file->output_buffer, 0);
745 
746 	  op->state = CLOSE_STATE_HANDLE_INPUT;
747 	  break;
748 
749 	  /* No op */
750 	case CLOSE_STATE_HANDLE_INPUT:
751 	  if (io_op->cancelled && !op->sent_cancel)
752 	    {
753 	      op->sent_cancel = TRUE;
754 	      append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
755 			      op->seq_nr, 0, 0, NULL);
756 	      op->state = CLOSE_STATE_WROTE_REQUEST;
757 	      io_op->io_buffer = file->output_buffer->str;
758 	      io_op->io_size = file->output_buffer->len;
759 	      io_op->io_allow_cancel = FALSE;
760 	      return STATE_OP_WRITE;
761 	    }
762 
763 	  if (io_op->io_res > 0)
764 	    {
765 	      gsize unread_size = io_op->io_size - io_op->io_res;
766 	      g_string_set_size (file->input_buffer,
767 				 file->input_buffer->len - unread_size);
768 	    }
769 
770 	  len = get_reply_header_missing_bytes (file->input_buffer);
771 	  if (len > 0)
772 	    {
773 	      gsize current_len = file->input_buffer->len;
774 	      g_string_set_size (file->input_buffer,
775 				 current_len + len);
776 	      io_op->io_buffer = file->input_buffer->str + current_len;
777 	      io_op->io_size = len;
778 	      io_op->io_allow_cancel = !op->sent_cancel;
779 	      return STATE_OP_READ;
780 	    }
781 
782 	  /* Got full header */
783 
784 	  {
785 	    GVfsDaemonSocketProtocolReply reply;
786 	    char *data;
787 	    data = decode_reply (file->input_buffer, &reply);
788 
789 	    if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
790 		reply.seq_nr == op->seq_nr)
791 	      {
792 		op->ret_val = FALSE;
793 		decode_error (&reply, data, &op->ret_error);
794 		g_string_truncate (file->input_buffer, 0);
795 		return STATE_OP_DONE;
796 	      }
797 	    else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED &&
798 		     reply.seq_nr == op->seq_nr)
799 	      {
800 		op->ret_val = TRUE;
801 		if (reply.arg2 > 0)
802 		  file->etag = g_strndup (data, reply.arg2);
803 		g_string_truncate (file->input_buffer, 0);
804 		return STATE_OP_DONE;
805 	      }
806 	    /* Ignore other reply types */
807 	  }
808 
809 	  g_string_truncate (file->input_buffer, 0);
810 
811 	  /* This wasn't interesting, read next reply */
812 	  op->state = CLOSE_STATE_HANDLE_INPUT;
813 	  break;
814 
815 	default:
816 	  g_assert_not_reached ();
817 	}
818 
819       /* Clear io_op between non-op state switches */
820       io_op->io_size = 0;
821       io_op->io_res = 0;
822       io_op->io_cancelled = FALSE;
823     }
824 }
825 
826 
827 static gboolean
g_daemon_file_output_stream_close(GOutputStream * stream,GCancellable * cancellable,GError ** error)828 g_daemon_file_output_stream_close (GOutputStream *stream,
829 				  GCancellable *cancellable,
830 				  GError      **error)
831 {
832   GDaemonFileOutputStream *file;
833   CloseOperation op;
834   gboolean res;
835 
836   file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
837 
838   /* We need to do a full roundtrip to guarantee that the writes have
839      reached the disk. */
840 
841   memset (&op, 0, sizeof (op));
842   op.state = CLOSE_STATE_INIT;
843 
844   if (!run_sync_state_machine (file, (state_machine_iterator)iterate_close_state_machine,
845 			       &op, cancellable, error))
846     res = FALSE;
847   else
848     {
849       if (!op.ret_val)
850 	g_propagate_error (error, op.ret_error);
851       res = op.ret_val;
852     }
853 
854   /* Return the first error, but close all streams */
855   if (res)
856     res = g_output_stream_close (file->command_stream, cancellable, error);
857   else
858     g_output_stream_close (file->command_stream, cancellable, NULL);
859 
860   if (res)
861     res = g_input_stream_close (file->data_stream, cancellable, error);
862   else
863     g_input_stream_close (file->data_stream, cancellable, NULL);
864 
865   return res;
866 }
867 
868 static goffset
g_daemon_file_output_stream_tell(GFileOutputStream * stream)869 g_daemon_file_output_stream_tell (GFileOutputStream *stream)
870 {
871   GDaemonFileOutputStream *file;
872 
873   file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
874 
875   return file->current_offset;
876 }
877 
878 static gboolean
g_daemon_file_output_stream_can_seek(GFileOutputStream * stream)879 g_daemon_file_output_stream_can_seek (GFileOutputStream *stream)
880 {
881   GDaemonFileOutputStream *file;
882 
883   file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
884 
885   return file->can_seek;
886 }
887 
888 static StateOp
iterate_seek_state_machine(GDaemonFileOutputStream * file,IOOperationData * io_op,SeekOperation * op)889 iterate_seek_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, SeekOperation *op)
890 {
891   gsize len;
892   guint32 request;
893 
894   while (TRUE)
895     {
896       switch (op->state)
897 	{
898 	  /* Initial state for read op */
899 	case SEEK_STATE_INIT:
900 	  request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_SET;
901 	  if (op->seek_type == G_SEEK_CUR)
902 	    op->offset = file->current_offset + op->offset;
903 	  else if (op->seek_type == G_SEEK_END)
904 	    request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END;
905 	  append_request (file, request,
906 			  op->offset & 0xffffffff,
907 			  op->offset >> 32,
908 			  0,
909 			  &op->seq_nr);
910 	  op->state = SEEK_STATE_WROTE_REQUEST;
911 	  io_op->io_buffer = file->output_buffer->str;
912 	  io_op->io_size = file->output_buffer->len;
913 	  io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
914 	  return STATE_OP_WRITE;
915 
916 	  /* wrote parts of output_buffer */
917 	case SEEK_STATE_WROTE_REQUEST:
918 	  if (io_op->io_cancelled)
919 	    {
920 	      if (!op->sent_cancel)
921 		unappend_request (file);
922 	      op->ret_val = -1;
923 	      g_set_error_literal (&op->ret_error,
924 				   G_IO_ERROR,
925 				   G_IO_ERROR_CANCELLED,
926 				   _("Operation was cancelled"));
927 	      return STATE_OP_DONE;
928 	    }
929 
930 	  if (io_op->io_res < file->output_buffer->len)
931 	    {
932 	      g_string_remove_in_front (file->output_buffer,
933 					io_op->io_res);
934 	      io_op->io_buffer = file->output_buffer->str;
935 	      io_op->io_size = file->output_buffer->len;
936 	      io_op->io_allow_cancel = FALSE;
937 	      return STATE_OP_WRITE;
938 	    }
939 	  g_string_truncate (file->output_buffer, 0);
940 
941 	  op->state = SEEK_STATE_HANDLE_INPUT;
942 	  break;
943 
944 	  /* No op */
945 	case SEEK_STATE_HANDLE_INPUT:
946 	  if (io_op->cancelled && !op->sent_cancel)
947 	    {
948 	      op->sent_cancel = TRUE;
949 	      append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
950 			      op->seq_nr, 0, 0, NULL);
951 	      op->state = SEEK_STATE_WROTE_REQUEST;
952 	      io_op->io_buffer = file->output_buffer->str;
953 	      io_op->io_size = file->output_buffer->len;
954 	      io_op->io_allow_cancel = FALSE;
955 	      return STATE_OP_WRITE;
956 	    }
957 
958 	  if (io_op->io_res > 0)
959 	    {
960 	      gsize unread_size = io_op->io_size - io_op->io_res;
961 	      g_string_set_size (file->input_buffer,
962 				 file->input_buffer->len - unread_size);
963 	    }
964 
965 	  len = get_reply_header_missing_bytes (file->input_buffer);
966 	  if (len > 0)
967 	    {
968 	      gsize current_len = file->input_buffer->len;
969 	      g_string_set_size (file->input_buffer,
970 				 current_len + len);
971 	      io_op->io_buffer = file->input_buffer->str + current_len;
972 	      io_op->io_size = len;
973 	      io_op->io_allow_cancel = !op->sent_cancel;
974 	      return STATE_OP_READ;
975 	    }
976 
977 	  /* Got full header */
978 
979 	  {
980 	    GVfsDaemonSocketProtocolReply reply;
981 	    char *data;
982 	    data = decode_reply (file->input_buffer, &reply);
983 
984 	    if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
985 		reply.seq_nr == op->seq_nr)
986 	      {
987 		op->ret_val = FALSE;
988 		decode_error (&reply, data, &op->ret_error);
989 		g_string_truncate (file->input_buffer, 0);
990 		return STATE_OP_DONE;
991 	      }
992 	    else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS &&
993 		     reply.seq_nr == op->seq_nr)
994 	      {
995 		op->ret_val = TRUE;
996 		op->ret_offset = ((goffset)reply.arg2) << 32 | (goffset)reply.arg1;
997 		g_string_truncate (file->input_buffer, 0);
998 		return STATE_OP_DONE;
999 	      }
1000 	    /* Ignore other reply types */
1001 	  }
1002 
1003 	  g_string_truncate (file->input_buffer, 0);
1004 
1005 	  /* This wasn't interesting, read next reply */
1006 	  op->state = SEEK_STATE_HANDLE_INPUT;
1007 	  break;
1008 
1009 	default:
1010 	  g_assert_not_reached ();
1011 	}
1012 
1013       /* Clear io_op between non-op state switches */
1014       io_op->io_size = 0;
1015       io_op->io_res = 0;
1016       io_op->io_cancelled = FALSE;
1017     }
1018 }
1019 
1020 static gboolean
g_daemon_file_output_stream_seek(GFileOutputStream * stream,goffset offset,GSeekType type,GCancellable * cancellable,GError ** error)1021 g_daemon_file_output_stream_seek (GFileOutputStream *stream,
1022 				 goffset offset,
1023 				 GSeekType type,
1024 				 GCancellable *cancellable,
1025 				 GError **error)
1026 {
1027   GDaemonFileOutputStream *file;
1028   SeekOperation op;
1029 
1030   file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
1031 
1032   if (!file->can_seek)
1033     {
1034       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
1035 			   _("Seek not supported on stream"));
1036       return FALSE;
1037     }
1038 
1039   if (g_cancellable_set_error_if_cancelled (cancellable, error))
1040     return FALSE;
1041 
1042   memset (&op, 0, sizeof (op));
1043   op.state = SEEK_STATE_INIT;
1044   op.offset = offset;
1045   op.seek_type = type;
1046 
1047   if (!run_sync_state_machine (file, (state_machine_iterator)iterate_seek_state_machine,
1048 			       &op, cancellable, error))
1049     return FALSE; /* IO Error */
1050 
1051   if (!op.ret_val)
1052     g_propagate_error (error, op.ret_error);
1053   else
1054     file->current_offset = op.ret_offset;
1055 
1056   return op.ret_val;
1057 }
1058 
1059 static StateOp
iterate_truncate_state_machine(GDaemonFileOutputStream * file,IOOperationData * io_op,TruncateOperation * op)1060 iterate_truncate_state_machine (GDaemonFileOutputStream *file,
1061                                 IOOperationData *io_op,
1062                                 TruncateOperation *op)
1063 {
1064   gsize len;
1065 
1066   while (TRUE)
1067     {
1068       switch (op->state)
1069         {
1070         case TRUNCATE_STATE_INIT:
1071           append_request (file,
1072                           G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_TRUNCATE,
1073                           op->size & 0xffffffff,
1074                           op->size >> 32,
1075                           0,
1076                           &op->seq_nr);
1077           op->state = TRUNCATE_STATE_WROTE_REQUEST;
1078           io_op->io_buffer = file->output_buffer->str;
1079           io_op->io_size = file->output_buffer->len;
1080           io_op->io_allow_cancel = TRUE;
1081           return STATE_OP_WRITE;
1082 
1083         case TRUNCATE_STATE_WROTE_REQUEST:
1084           if (io_op->io_cancelled)
1085             {
1086               if (!op->sent_cancel)
1087                 unappend_request (file);
1088               op->ret_val = FALSE;
1089               g_set_error_literal (&op->ret_error,
1090                                    G_IO_ERROR,
1091                                    G_IO_ERROR_CANCELLED,
1092                                    _("Operation was cancelled"));
1093               return STATE_OP_DONE;
1094             }
1095 
1096           if (io_op->io_res < file->output_buffer->len)
1097             {
1098               g_string_remove_in_front (file->output_buffer, io_op->io_res);
1099               io_op->io_buffer = file->output_buffer->str;
1100               io_op->io_size = file->output_buffer->len;
1101               io_op->io_allow_cancel = FALSE;
1102               return STATE_OP_WRITE;
1103             }
1104           g_string_truncate (file->output_buffer, 0);
1105 
1106           op->state = TRUNCATE_STATE_HANDLE_INPUT;
1107           break;
1108 
1109         case TRUNCATE_STATE_HANDLE_INPUT:
1110           if (io_op->cancelled && !op->sent_cancel)
1111             {
1112               op->sent_cancel = TRUE;
1113               append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
1114                               op->seq_nr, 0, 0, NULL);
1115               op->state = TRUNCATE_STATE_WROTE_REQUEST;
1116               io_op->io_buffer = file->output_buffer->str;
1117               io_op->io_size = file->output_buffer->len;
1118               io_op->io_allow_cancel = FALSE;
1119               return STATE_OP_WRITE;
1120             }
1121 
1122           if (io_op->io_res > 0)
1123             {
1124               gsize unread_size = io_op->io_size - io_op->io_res;
1125               g_string_set_size (file->input_buffer,
1126                                  file->input_buffer->len - unread_size);
1127             }
1128 
1129           len = get_reply_header_missing_bytes (file->input_buffer);
1130           if (len > 0)
1131             {
1132               gsize current_len = file->input_buffer->len;
1133               g_string_set_size (file->input_buffer, current_len + len);
1134               io_op->io_buffer = file->input_buffer->str + current_len;
1135               io_op->io_size = len;
1136               io_op->io_allow_cancel = !op->sent_cancel;
1137               return STATE_OP_READ;
1138             }
1139 
1140           {
1141             GVfsDaemonSocketProtocolReply reply;
1142             char *data;
1143             data = decode_reply (file->input_buffer, &reply);
1144 
1145             if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
1146                 reply.seq_nr == op->seq_nr)
1147               {
1148                 op->ret_val = FALSE;
1149                 decode_error (&reply, data, &op->ret_error);
1150                 g_string_truncate (file->input_buffer, 0);
1151                 return STATE_OP_DONE;
1152               }
1153             else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_TRUNCATED &&
1154                      reply.seq_nr == op->seq_nr)
1155               {
1156                 op->ret_val = TRUE;
1157                 g_string_truncate (file->input_buffer, 0);
1158                 return STATE_OP_DONE;
1159               }
1160             /* Ignore other reply types */
1161           }
1162 
1163           g_string_truncate (file->input_buffer, 0);
1164 
1165           /* This wasn't interesting, read next reply */
1166           op->state = TRUNCATE_STATE_HANDLE_INPUT;
1167           break;
1168 
1169         default:
1170           g_assert_not_reached ();
1171         }
1172 
1173       /* Clear io_op between non-op state switches */
1174       io_op->io_size = 0;
1175       io_op->io_res = 0;
1176       io_op->io_cancelled = FALSE;
1177     }
1178 }
1179 
1180 static gboolean
g_daemon_file_output_stream_can_truncate(GFileOutputStream * stream)1181 g_daemon_file_output_stream_can_truncate (GFileOutputStream *stream)
1182 {
1183   GDaemonFileOutputStream *file;
1184 
1185   file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
1186 
1187   return file->can_truncate;
1188 }
1189 
1190 static gboolean
g_daemon_file_output_stream_truncate(GFileOutputStream * stream,goffset size,GCancellable * cancellable,GError ** error)1191 g_daemon_file_output_stream_truncate (GFileOutputStream *stream,
1192                                       goffset            size,
1193                                       GCancellable      *cancellable,
1194                                       GError           **error)
1195 {
1196   GDaemonFileOutputStream *file;
1197   TruncateOperation op;
1198 
1199   file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
1200 
1201   if (!file->can_truncate)
1202     {
1203       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
1204                            _("Truncate not supported on stream"));
1205       return FALSE;
1206     }
1207 
1208   if (g_cancellable_set_error_if_cancelled (cancellable, error))
1209     return FALSE;
1210 
1211   memset (&op, 0, sizeof (op));
1212   op.state = TRUNCATE_STATE_INIT;
1213   op.size = size;
1214 
1215   if (!run_sync_state_machine (file, (state_machine_iterator)iterate_truncate_state_machine,
1216                                &op, cancellable, error))
1217     return FALSE; /* IO Error */
1218 
1219   if (!op.ret_val)
1220     g_propagate_error (error, op.ret_error);
1221 
1222   return op.ret_val;
1223 }
1224 
1225 static char *
g_daemon_file_output_stream_get_etag(GFileOutputStream * stream)1226 g_daemon_file_output_stream_get_etag (GFileOutputStream     *stream)
1227 {
1228   GDaemonFileOutputStream *file;
1229 
1230   file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
1231 
1232   return g_strdup (file->etag);
1233 }
1234 
1235 static StateOp
iterate_query_state_machine(GDaemonFileOutputStream * file,IOOperationData * io_op,QueryOperation * op)1236 iterate_query_state_machine (GDaemonFileOutputStream *file,
1237 			     IOOperationData *io_op,
1238 			     QueryOperation *op)
1239 {
1240   gsize len;
1241   guint32 request;
1242 
1243   while (TRUE)
1244     {
1245       switch (op->state)
1246 	{
1247 	  /* Initial state for read op */
1248 	case QUERY_STATE_INIT:
1249 	  request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_QUERY_INFO;
1250 	  append_request (file, request,
1251 			  0,
1252 			  0,
1253 			  strlen (op->attributes),
1254 			  &op->seq_nr);
1255 	  g_string_append (file->output_buffer,
1256 			   op->attributes);
1257 
1258 	  op->state = QUERY_STATE_WROTE_REQUEST;
1259 	  io_op->io_buffer = file->output_buffer->str;
1260 	  io_op->io_size = file->output_buffer->len;
1261 	  io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
1262 	  return STATE_OP_WRITE;
1263 
1264 	  /* wrote parts of output_buffer */
1265 	case QUERY_STATE_WROTE_REQUEST:
1266 	  if (io_op->io_cancelled)
1267 	    {
1268 	      if (!op->sent_cancel)
1269 		unappend_request (file);
1270 	      op->info = NULL;
1271 	      g_set_error_literal (&op->ret_error,
1272 				   G_IO_ERROR,
1273 				   G_IO_ERROR_CANCELLED,
1274 				   _("Operation was cancelled"));
1275 	      return STATE_OP_DONE;
1276 	    }
1277 
1278 	  if (io_op->io_res < file->output_buffer->len)
1279 	    {
1280 	      g_string_remove_in_front (file->output_buffer,
1281 					io_op->io_res);
1282 	      io_op->io_buffer = file->output_buffer->str;
1283 	      io_op->io_size = file->output_buffer->len;
1284 	      io_op->io_allow_cancel = FALSE;
1285 	      return STATE_OP_WRITE;
1286 	    }
1287 	  g_string_truncate (file->output_buffer, 0);
1288 
1289 	  op->state = QUERY_STATE_HANDLE_INPUT;
1290 	  break;
1291 
1292 	  /* No op */
1293 	case QUERY_STATE_HANDLE_INPUT:
1294 	  if (io_op->cancelled && !op->sent_cancel)
1295 	    {
1296 	      op->sent_cancel = TRUE;
1297 	      append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
1298 			      op->seq_nr, 0, 0, NULL);
1299 	      op->state = QUERY_STATE_WROTE_REQUEST;
1300 	      io_op->io_buffer = file->output_buffer->str;
1301 	      io_op->io_size = file->output_buffer->len;
1302 	      io_op->io_allow_cancel = FALSE;
1303 	      return STATE_OP_WRITE;
1304 	    }
1305 
1306 
1307 	  if (io_op->io_res > 0)
1308 	    {
1309 	      gsize unread_size = io_op->io_size - io_op->io_res;
1310 	      g_string_set_size (file->input_buffer,
1311 				 file->input_buffer->len - unread_size);
1312 	    }
1313 
1314 	  len = get_reply_header_missing_bytes (file->input_buffer);
1315 	  if (len > 0)
1316 	    {
1317 	      gsize current_len = file->input_buffer->len;
1318 	      g_string_set_size (file->input_buffer,
1319 				 current_len + len);
1320 	      io_op->io_buffer = file->input_buffer->str + current_len;
1321 	      io_op->io_size = len;
1322 	      io_op->io_allow_cancel = !op->sent_cancel;
1323 	      return STATE_OP_READ;
1324 	    }
1325 
1326 	  /* Got full header */
1327 
1328 	  {
1329 	    GVfsDaemonSocketProtocolReply reply;
1330 	    char *data;
1331 	    data = decode_reply (file->input_buffer, &reply);
1332 
1333 	    if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
1334 		reply.seq_nr == op->seq_nr)
1335 	      {
1336 		op->info = NULL;
1337 		decode_error (&reply, data, &op->ret_error);
1338 		g_string_truncate (file->input_buffer, 0);
1339 		return STATE_OP_DONE;
1340 	      }
1341 	    else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_INFO &&
1342 		     reply.seq_nr == op->seq_nr)
1343 	      {
1344 		op->info = gvfs_file_info_demarshal (data, reply.arg2);
1345 		g_string_truncate (file->input_buffer, 0);
1346 		return STATE_OP_DONE;
1347 	      }
1348 	    /* Ignore other reply types */
1349 	  }
1350 
1351 	  g_string_truncate (file->input_buffer, 0);
1352 
1353 	  /* This wasn't interesting, read next reply */
1354 	  op->state = QUERY_STATE_HANDLE_INPUT;
1355 	  break;
1356 
1357 	default:
1358 	  g_assert_not_reached ();
1359 	}
1360 
1361       /* Clear io_op between non-op state switches */
1362       io_op->io_size = 0;
1363       io_op->io_res = 0;
1364       io_op->io_cancelled = FALSE;
1365     }
1366 }
1367 
1368 static GFileInfo *
g_daemon_file_output_stream_query_info(GFileOutputStream * stream,const char * attributes,GCancellable * cancellable,GError ** error)1369 g_daemon_file_output_stream_query_info (GFileOutputStream    *stream,
1370 					const char           *attributes,
1371 					GCancellable         *cancellable,
1372 					GError              **error)
1373 {
1374    GDaemonFileOutputStream *file;
1375    QueryOperation op;
1376 
1377   file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
1378 
1379   if (g_cancellable_set_error_if_cancelled (cancellable, error))
1380     return NULL;
1381 
1382   memset (&op, 0, sizeof (op));
1383   op.state = QUERY_STATE_INIT;
1384   if (attributes)
1385     op.attributes = (char *)attributes;
1386   else
1387     op.attributes = "";
1388 
1389   if (!run_sync_state_machine (file, (state_machine_iterator)iterate_query_state_machine,
1390 			       &op, cancellable, error))
1391     return NULL; /* IO Error */
1392 
1393   if (op.info == NULL)
1394     g_propagate_error (error, op.ret_error);
1395 
1396   return op.info;
1397 }
1398 
1399 /************************************************************************
1400  *         Async I/O Code                                               *
1401  ************************************************************************/
1402 
1403 typedef struct AsyncIterator AsyncIterator;
1404 
1405 typedef void (*AsyncIteratorDone) (GTask *task);
1406 
1407 struct AsyncIterator {
1408   AsyncIteratorDone done_cb;
1409   IOOperationData io_data;
1410   state_machine_iterator iterator;
1411   GTask *task;
1412 };
1413 
1414 static void async_iterate (AsyncIterator *iterator);
1415 
1416 static void
async_op_handle(AsyncIterator * iterator,gssize res,GError * io_error)1417 async_op_handle (AsyncIterator *iterator,
1418 		 gssize res,
1419 		 GError *io_error)
1420 {
1421   IOOperationData *io_data = &iterator->io_data;
1422 
1423   if (io_error != NULL)
1424     {
1425       if (error_is_cancel (io_error))
1426 	{
1427 	  io_data->io_res = 0;
1428 	  io_data->io_cancelled = TRUE;
1429 	}
1430       else
1431 	{
1432         g_task_return_new_error (iterator->task, G_IO_ERROR, G_IO_ERROR_FAILED,
1433                                  _("Error in stream protocol: %s"), io_error->message);
1434         g_object_unref (iterator->task);
1435         g_free (iterator);
1436         return;
1437 	}
1438     }
1439   else if (res == 0 && io_data->io_size != 0)
1440     {
1441       g_task_return_new_error (iterator->task, G_IO_ERROR, G_IO_ERROR_FAILED,
1442                                _("Error in stream protocol: %s"), _("End of stream"));
1443       g_object_unref (iterator->task);
1444       g_free (iterator);
1445       return;
1446     }
1447   else
1448     {
1449       io_data->io_res = res;
1450       io_data->io_cancelled = FALSE;
1451     }
1452 
1453   async_iterate (iterator);
1454 }
1455 
1456 static void
async_read_op_callback(GObject * source_object,GAsyncResult * res,gpointer user_data)1457 async_read_op_callback (GObject *source_object,
1458 			GAsyncResult *res,
1459 			gpointer      user_data)
1460 {
1461   GInputStream *stream = G_INPUT_STREAM (source_object);
1462   gssize count_read;
1463   GError *error = NULL;
1464 
1465   count_read = g_input_stream_read_finish (stream, res, &error);
1466 
1467   async_op_handle ((AsyncIterator *)user_data, count_read, error);
1468   if (error)
1469     g_error_free (error);
1470 }
1471 
1472 static void
async_skip_op_callback(GObject * source_object,GAsyncResult * res,gpointer user_data)1473 async_skip_op_callback (GObject *source_object,
1474 			GAsyncResult *res,
1475 			gpointer      user_data)
1476 {
1477   GInputStream *stream = G_INPUT_STREAM (source_object);
1478   gssize count_skipped;
1479   GError *error = NULL;
1480 
1481   count_skipped = g_input_stream_skip_finish (stream, res, &error);
1482 
1483   async_op_handle ((AsyncIterator *)user_data, count_skipped, error);
1484   if (error)
1485     g_error_free (error);
1486 }
1487 
1488 static void
async_write_op_callback(GObject * source_object,GAsyncResult * res,gpointer user_data)1489 async_write_op_callback (GObject *source_object,
1490 			 GAsyncResult *res,
1491 			 gpointer user_data)
1492 {
1493   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1494   gssize bytes_written;
1495   GError *error = NULL;
1496 
1497   bytes_written = g_output_stream_write_finish (stream, res, &error);
1498 
1499   async_op_handle ((AsyncIterator *)user_data, bytes_written, error);
1500   if (error)
1501     g_error_free (error);
1502 }
1503 
1504 static void
async_iterate(AsyncIterator * iterator)1505 async_iterate (AsyncIterator *iterator)
1506 {
1507   IOOperationData *io_data = &iterator->io_data;
1508   GDaemonFileOutputStream *file;
1509   GCancellable *cancellable = g_task_get_cancellable (iterator->task);
1510   StateOp io_op;
1511 
1512   io_data->cancelled = g_cancellable_is_cancelled (cancellable);
1513 
1514   file = G_DAEMON_FILE_OUTPUT_STREAM (g_task_get_source_object (iterator->task));
1515   io_op = iterator->iterator (file, io_data, g_task_get_task_data (iterator->task));
1516 
1517   if (io_op == STATE_OP_DONE)
1518     {
1519       iterator->done_cb (iterator->task);
1520       g_free (iterator);
1521       return;
1522     }
1523 
1524   /* TODO: Handle allow_cancel... */
1525 
1526   if (io_op == STATE_OP_READ)
1527     {
1528       g_input_stream_read_async (file->data_stream,
1529 				 io_data->io_buffer, io_data->io_size,
1530 				 g_task_get_priority (iterator->task),
1531 				 io_data->io_allow_cancel ? cancellable : NULL,
1532 				 async_read_op_callback, iterator);
1533     }
1534   else if (io_op == STATE_OP_SKIP)
1535     {
1536       g_input_stream_skip_async (file->data_stream,
1537 				 io_data->io_size,
1538 				 g_task_get_priority (iterator->task),
1539 				 io_data->io_allow_cancel ? cancellable : NULL,
1540 				 async_skip_op_callback, iterator);
1541     }
1542   else if (io_op == STATE_OP_WRITE)
1543     {
1544       g_output_stream_write_async (file->command_stream,
1545 				   io_data->io_buffer, io_data->io_size,
1546 				   g_task_get_priority (iterator->task),
1547 				   io_data->io_allow_cancel ? cancellable : NULL,
1548 				   async_write_op_callback, iterator);
1549     }
1550   else
1551     g_assert_not_reached ();
1552 }
1553 
1554 static void
run_async_state_machine(GTask * task,state_machine_iterator iterator_cb,AsyncIteratorDone done_cb)1555 run_async_state_machine (GTask *task,
1556 			 state_machine_iterator iterator_cb,
1557 			 AsyncIteratorDone done_cb)
1558 {
1559   AsyncIterator *iterator;
1560 
1561   iterator = g_new0 (AsyncIterator, 1);
1562   iterator->iterator = iterator_cb;
1563   iterator->done_cb = done_cb;
1564   iterator->task = task;
1565 
1566   async_iterate (iterator);
1567 }
1568 
1569 static void
async_write_done(GTask * task)1570 async_write_done (GTask *task)
1571 {
1572   WriteOperation *op;
1573   gssize count_written;
1574   GError *error;
1575 
1576   op = g_task_get_task_data (task);
1577 
1578   count_written = op->ret_val;
1579   error = op->ret_error;
1580 
1581   if (count_written == -1)
1582     g_task_return_error (task, error);
1583   else
1584     g_task_return_int (task, count_written);
1585 
1586   g_object_unref (task);
1587 }
1588 
1589 static void
g_daemon_file_output_stream_write_async(GOutputStream * stream,const void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer data)1590 g_daemon_file_output_stream_write_async  (GOutputStream      *stream,
1591 					  const void         *buffer,
1592 					  gsize               count,
1593 					  int                 io_priority,
1594 					  GCancellable       *cancellable,
1595 					  GAsyncReadyCallback callback,
1596 					  gpointer            data)
1597 {
1598   WriteOperation *op;
1599   GTask *task;
1600 
1601   task = g_task_new (stream, cancellable, callback, data);
1602   g_task_set_priority (task, io_priority);
1603   g_task_set_source_tag (task, g_daemon_file_output_stream_write_async);
1604 
1605   /* Limit for sanity and to avoid 32bit overflow */
1606   if (count > MAX_WRITE_SIZE)
1607     count = MAX_WRITE_SIZE;
1608 
1609   op = g_new0 (WriteOperation, 1);
1610   op->state = WRITE_STATE_INIT;
1611   op->buffer = buffer;
1612   op->buffer_size = count;
1613 
1614   g_task_set_task_data (task, op, g_free);
1615 
1616   run_async_state_machine (task,
1617 			   (state_machine_iterator)iterate_write_state_machine,
1618 			   async_write_done);
1619 }
1620 
1621 static gssize
g_daemon_file_output_stream_write_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)1622 g_daemon_file_output_stream_write_finish (GOutputStream             *stream,
1623 					  GAsyncResult              *result,
1624 					  GError                   **error)
1625 {
1626   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1627   g_return_val_if_fail (g_async_result_is_tagged (result, g_daemon_file_output_stream_write_async), -1);
1628 
1629   return g_task_propagate_int (G_TASK (result), error);
1630 }
1631 
1632 static void
async_close_done(GTask * task)1633 async_close_done (GTask *task)
1634 {
1635   GDaemonFileOutputStream *file;
1636   CloseOperation *op;
1637   gboolean result;
1638   GError *error;
1639   GCancellable *cancellable = g_task_get_cancellable (task);
1640 
1641   file = G_DAEMON_FILE_OUTPUT_STREAM (g_task_get_source_object (task));
1642   op = g_task_get_task_data (task);
1643 
1644   result = op->ret_val;
1645   error = op->ret_error;
1646 
1647   if (result)
1648     result = g_output_stream_close (file->command_stream, cancellable, &error);
1649   else
1650     g_output_stream_close (file->command_stream, cancellable, NULL);
1651 
1652   if (result)
1653     result = g_input_stream_close (file->data_stream, cancellable, &error);
1654   else
1655     g_input_stream_close (file->data_stream, cancellable, NULL);
1656 
1657   if (!result)
1658     g_task_return_error (task, error);
1659   else
1660     g_task_return_boolean (task, TRUE);
1661 
1662   g_object_unref (task);
1663 }
1664 
1665 static void
g_daemon_file_output_stream_close_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer data)1666 g_daemon_file_output_stream_close_async (GOutputStream     *stream,
1667 					int                 io_priority,
1668 					GCancellable       *cancellable,
1669 					GAsyncReadyCallback callback,
1670 					gpointer            data)
1671 {
1672   CloseOperation *op;
1673   GTask *task;
1674 
1675   task = g_task_new (stream, cancellable, callback, data);
1676   g_task_set_priority (task, io_priority);
1677   g_task_set_source_tag (task, g_daemon_file_output_stream_close_async);
1678 
1679   op = g_new0 (CloseOperation, 1);
1680   op->state = CLOSE_STATE_INIT;
1681 
1682   g_task_set_task_data (task, op, g_free);
1683 
1684   run_async_state_machine (task,
1685 			   (state_machine_iterator)iterate_close_state_machine,
1686 			   async_close_done);
1687 }
1688 
1689 static gboolean
g_daemon_file_output_stream_close_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)1690 g_daemon_file_output_stream_close_finish (GOutputStream             *stream,
1691 					  GAsyncResult              *result,
1692 					  GError                   **error)
1693 {
1694   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1695   g_return_val_if_fail (g_async_result_is_tagged (result, g_daemon_file_output_stream_close_async), FALSE);
1696 
1697   return g_task_propagate_boolean (G_TASK (result), error);
1698 }
1699 
1700 static void
async_query_done(GTask * task)1701 async_query_done (GTask *task)
1702 {
1703   QueryOperation *op;
1704   GFileInfo *info;
1705   GError *error;
1706 
1707   op = g_task_get_task_data (task);
1708 
1709   info = op->info;
1710   error = op->ret_error;
1711 
1712   if (info == NULL)
1713     g_task_return_error (task, error);
1714   else
1715     g_task_return_pointer (task, info, g_object_unref);
1716 
1717   g_object_unref (task);
1718 }
1719 
1720 static void
g_daemon_file_output_stream_query_info_async(GFileOutputStream * stream,const char * attributes,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1721 g_daemon_file_output_stream_query_info_async  (GFileOutputStream    *stream,
1722 					       const char           *attributes,
1723 					       int                   io_priority,
1724 					       GCancellable         *cancellable,
1725 					       GAsyncReadyCallback   callback,
1726 					       gpointer              user_data)
1727 {
1728   QueryOperation *op;
1729   GTask *task;
1730 
1731   task = g_task_new (stream, cancellable, callback, user_data);
1732   g_task_set_priority (task, io_priority);
1733   g_task_set_source_tag (task, g_daemon_file_output_stream_query_info_async);
1734 
1735   op = g_new0 (QueryOperation, 1);
1736   op->state = QUERY_STATE_INIT;
1737   if (attributes)
1738     op->attributes = g_strdup (attributes);
1739   else
1740     op->attributes = g_strdup ("");
1741 
1742   g_task_set_task_data (task, op, (GDestroyNotify)query_operation_free);
1743 
1744   run_async_state_machine (task,
1745 			   (state_machine_iterator)iterate_query_state_machine,
1746 			   async_query_done);
1747 }
1748 
1749 static GFileInfo *
g_daemon_file_output_stream_query_info_finish(GFileOutputStream * stream,GAsyncResult * result,GError ** error)1750 g_daemon_file_output_stream_query_info_finish (GFileOutputStream     *stream,
1751 					       GAsyncResult         *result,
1752 					       GError              **error)
1753 {
1754   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
1755   g_return_val_if_fail (g_async_result_is_tagged (result, g_daemon_file_output_stream_query_info_async), NULL);
1756 
1757   return g_task_propagate_pointer (G_TASK (result), error);
1758 }
1759