1 /* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 *
20 * Author: Alexander Larsson <alexl@redhat.com>
21 */
22
23 #include <config.h>
24
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <fcntl.h>
28 #include <unistd.h>
29 #include <errno.h>
30 #include <string.h>
31 #include <sys/socket.h>
32 #include <sys/un.h>
33
34 #include <glib.h>
35 #include <glib/gstdio.h>
36 #include <glib/gi18n-lib.h>
37 #include <gio/gio.h>
38 #include <gio/gunixinputstream.h>
39 #include <gio/gunixoutputstream.h>
40 #include "gdaemonfileoutputstream.h"
41 #include "gvfsdaemondbus.h"
42 #include <gvfsdaemonprotocol.h>
43 #include <gvfsfileinfo.h>
44
45 #define MAX_WRITE_SIZE (4*1024*1024)
46
47 typedef enum {
48 STATE_OP_DONE,
49 STATE_OP_READ,
50 STATE_OP_WRITE,
51 STATE_OP_SKIP
52 } StateOp;
53
54 typedef enum {
55 WRITE_STATE_INIT = 0,
56 WRITE_STATE_WROTE_COMMAND,
57 WRITE_STATE_SEND_DATA,
58 WRITE_STATE_HANDLE_INPUT
59 } WriteState;
60
61 typedef struct {
62 WriteState state;
63
64 /* Output */
65 const char *buffer;
66 gsize buffer_size;
67 gsize buffer_pos;
68
69 /* Input */
70 gssize ret_val;
71 GError *ret_error;
72
73 gboolean sent_cancel;
74
75 guint32 seq_nr;
76 } WriteOperation;
77
78 typedef enum {
79 SEEK_STATE_INIT = 0,
80 SEEK_STATE_WROTE_REQUEST,
81 SEEK_STATE_HANDLE_INPUT
82 } SeekState;
83
84 typedef struct {
85 SeekState state;
86
87 /* Output */
88 goffset offset;
89 GSeekType seek_type;
90 /* Output */
91 gboolean ret_val;
92 GError *ret_error;
93 goffset ret_offset;
94
95 gboolean sent_cancel;
96
97 guint32 seq_nr;
98 } SeekOperation;
99
100 typedef enum {
101 TRUNCATE_STATE_INIT = 0,
102 TRUNCATE_STATE_WROTE_REQUEST,
103 TRUNCATE_STATE_HANDLE_INPUT
104 } TruncateState;
105
106 typedef struct {
107 TruncateState state;
108
109 /* Output */
110 goffset size;
111 /* Input */
112 gboolean ret_val;
113 GError *ret_error;
114
115 gboolean sent_cancel;
116
117 guint32 seq_nr;
118 } TruncateOperation;
119
120 typedef enum {
121 CLOSE_STATE_INIT = 0,
122 CLOSE_STATE_WROTE_REQUEST,
123 CLOSE_STATE_HANDLE_INPUT
124 } CloseState;
125
126 typedef struct {
127 CloseState state;
128
129 /* Output */
130
131 /* Output */
132 gboolean ret_val;
133 GError *ret_error;
134
135 gboolean sent_cancel;
136
137 guint32 seq_nr;
138 } CloseOperation;
139
140 typedef enum {
141 QUERY_STATE_INIT = 0,
142 QUERY_STATE_WROTE_REQUEST,
143 QUERY_STATE_HANDLE_INPUT,
144 } QueryState;
145
146 typedef struct {
147 QueryState state;
148
149 /* Input */
150 char *attributes;
151
152 /* Output */
153 GFileInfo *info;
154 GError *ret_error;
155
156 gboolean sent_cancel;
157
158 guint32 seq_nr;
159 } QueryOperation;
160
161 typedef struct {
162 gboolean cancelled;
163
164 char *io_buffer;
165 gsize io_size;
166 gsize io_res;
167 /* The operation always succeeds, or gets cancelled.
168 If we get an error doing the i/o that is considered fatal */
169 gboolean io_allow_cancel;
170 gboolean io_cancelled;
171 } IOOperationData;
172
173 typedef StateOp (*state_machine_iterator) (GDaemonFileOutputStream *file, IOOperationData *io_op, gpointer data);
174
175 struct _GDaemonFileOutputStream {
176 GFileOutputStream parent;
177
178 GOutputStream *command_stream;
179 GInputStream *data_stream;
180 gboolean can_seek;
181 gboolean can_truncate;
182
183 guint32 seq_nr;
184 goffset current_offset;
185
186 gsize input_block_size;
187 GString *input_buffer;
188
189 GString *output_buffer;
190
191 char *etag;
192
193 };
194
195 static gssize g_daemon_file_output_stream_write (GOutputStream *stream,
196 const void *buffer,
197 gsize count,
198 GCancellable *cancellable,
199 GError **error);
200 static gboolean g_daemon_file_output_stream_close (GOutputStream *stream,
201 GCancellable *cancellable,
202 GError **error);
203 static GFileInfo *g_daemon_file_output_stream_query_info (GFileOutputStream *stream,
204 const char *attributes,
205 GCancellable *cancellable,
206 GError **error);
207 static char *g_daemon_file_output_stream_get_etag (GFileOutputStream *stream);
208 static goffset g_daemon_file_output_stream_tell (GFileOutputStream *stream);
209 static gboolean g_daemon_file_output_stream_can_seek (GFileOutputStream *stream);
210 static gboolean g_daemon_file_output_stream_seek (GFileOutputStream *stream,
211 goffset offset,
212 GSeekType type,
213 GCancellable *cancellable,
214 GError **error);
215 static gboolean g_daemon_file_output_stream_can_truncate (GFileOutputStream *stream);
216 static gboolean g_daemon_file_output_stream_truncate (GFileOutputStream *stream,
217 goffset size,
218 GCancellable *cancellable,
219 GError **error);
220 static void g_daemon_file_output_stream_write_async (GOutputStream *stream,
221 const void *buffer,
222 gsize count,
223 int io_priority,
224 GCancellable *cancellable,
225 GAsyncReadyCallback callback,
226 gpointer data);
227 static gssize g_daemon_file_output_stream_write_finish (GOutputStream *stream,
228 GAsyncResult *result,
229 GError **error);
230 static void g_daemon_file_output_stream_close_async (GOutputStream *stream,
231 int io_priority,
232 GCancellable *cancellable,
233 GAsyncReadyCallback callback,
234 gpointer data);
235 static gboolean g_daemon_file_output_stream_close_finish (GOutputStream *stream,
236 GAsyncResult *result,
237 GError **error);
238 static void g_daemon_file_output_stream_query_info_async (GFileOutputStream *stream,
239 const char *attributes,
240 int io_priority,
241 GCancellable *cancellable,
242 GAsyncReadyCallback callback,
243 gpointer user_data);
244 static GFileInfo *g_daemon_file_output_stream_query_info_finish (GFileOutputStream *stream,
245 GAsyncResult *result,
246 GError **error);
247
248
249
G_DEFINE_TYPE(GDaemonFileOutputStream,g_daemon_file_output_stream,G_TYPE_FILE_OUTPUT_STREAM)250 G_DEFINE_TYPE (GDaemonFileOutputStream, g_daemon_file_output_stream,
251 G_TYPE_FILE_OUTPUT_STREAM)
252
253 static void
254 query_operation_free (QueryOperation *op)
255 {
256 g_free (op->attributes);
257 g_free (op);
258 }
259
260 static void
g_string_remove_in_front(GString * string,gsize bytes)261 g_string_remove_in_front (GString *string,
262 gsize bytes)
263 {
264 memmove (string->str,
265 string->str + bytes,
266 string->len - bytes);
267 g_string_truncate (string,
268 string->len - bytes);
269 }
270
271 static void
g_daemon_file_output_stream_finalize(GObject * object)272 g_daemon_file_output_stream_finalize (GObject *object)
273 {
274 GDaemonFileOutputStream *file;
275
276 file = G_DAEMON_FILE_OUTPUT_STREAM (object);
277
278 if (file->command_stream)
279 g_object_unref (file->command_stream);
280 if (file->data_stream)
281 g_object_unref (file->data_stream);
282
283 g_string_free (file->input_buffer, TRUE);
284 g_string_free (file->output_buffer, TRUE);
285
286 g_free (file->etag);
287
288 if (G_OBJECT_CLASS (g_daemon_file_output_stream_parent_class)->finalize)
289 (*G_OBJECT_CLASS (g_daemon_file_output_stream_parent_class)->finalize) (object);
290 }
291
292 static void
g_daemon_file_output_stream_class_init(GDaemonFileOutputStreamClass * klass)293 g_daemon_file_output_stream_class_init (GDaemonFileOutputStreamClass *klass)
294 {
295 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
296 GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass);
297 GFileOutputStreamClass *file_stream_class = G_FILE_OUTPUT_STREAM_CLASS (klass);
298
299 gobject_class->finalize = g_daemon_file_output_stream_finalize;
300
301 stream_class->write_fn = g_daemon_file_output_stream_write;
302 stream_class->close_fn = g_daemon_file_output_stream_close;
303
304 stream_class->write_async = g_daemon_file_output_stream_write_async;
305 stream_class->write_finish = g_daemon_file_output_stream_write_finish;
306 stream_class->close_async = g_daemon_file_output_stream_close_async;
307 stream_class->close_finish = g_daemon_file_output_stream_close_finish;
308
309 file_stream_class->tell = g_daemon_file_output_stream_tell;
310 file_stream_class->can_seek = g_daemon_file_output_stream_can_seek;
311 file_stream_class->seek = g_daemon_file_output_stream_seek;
312 file_stream_class->can_truncate = g_daemon_file_output_stream_can_truncate;
313 file_stream_class->truncate_fn = g_daemon_file_output_stream_truncate;
314 file_stream_class->query_info = g_daemon_file_output_stream_query_info;
315 file_stream_class->get_etag = g_daemon_file_output_stream_get_etag;
316 file_stream_class->query_info_async = g_daemon_file_output_stream_query_info_async;
317 file_stream_class->query_info_finish = g_daemon_file_output_stream_query_info_finish;
318 }
319
320 static void
g_daemon_file_output_stream_init(GDaemonFileOutputStream * info)321 g_daemon_file_output_stream_init (GDaemonFileOutputStream *info)
322 {
323 info->output_buffer = g_string_new ("");
324 info->input_buffer = g_string_new ("");
325 info->seq_nr = 1;
326 }
327
328 GFileOutputStream *
g_daemon_file_output_stream_new(int fd,guint32 flags,goffset initial_offset)329 g_daemon_file_output_stream_new (int fd,
330 guint32 flags,
331 goffset initial_offset)
332 {
333 GDaemonFileOutputStream *stream;
334
335 stream = g_object_new (G_TYPE_DAEMON_FILE_OUTPUT_STREAM, NULL);
336
337 stream->command_stream = g_unix_output_stream_new (fd, FALSE);
338 stream->data_stream = g_unix_input_stream_new (fd, TRUE);
339 stream->can_seek = flags & OPEN_FOR_WRITE_FLAG_CAN_SEEK;
340 stream->can_truncate = flags & OPEN_FOR_WRITE_FLAG_CAN_TRUNCATE;
341 stream->current_offset = initial_offset;
342
343 return G_FILE_OUTPUT_STREAM (stream);
344 }
345
346 static gboolean
error_is_cancel(GError * error)347 error_is_cancel (GError *error)
348 {
349 return error != NULL &&
350 error->domain == G_IO_ERROR &&
351 error->code == G_IO_ERROR_CANCELLED;
352 }
353
354 static void
unappend_request(GDaemonFileOutputStream * stream)355 unappend_request (GDaemonFileOutputStream *stream)
356 {
357 g_assert (stream->output_buffer->len >= G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE);
358 stream->seq_nr--;
359 g_string_truncate (stream->output_buffer,
360 stream->output_buffer->len - G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE);
361 }
362
363 static void
append_request(GDaemonFileOutputStream * stream,guint32 command,guint32 arg1,guint32 arg2,guint32 data_len,guint32 * seq_nr)364 append_request (GDaemonFileOutputStream *stream, guint32 command,
365 guint32 arg1, guint32 arg2, guint32 data_len, guint32 *seq_nr)
366 {
367 GVfsDaemonSocketProtocolRequest cmd;
368
369 g_assert (sizeof (cmd) == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE);
370
371 if (seq_nr)
372 *seq_nr = stream->seq_nr;
373
374 cmd.command = g_htonl (command);
375 cmd.seq_nr = g_htonl (stream->seq_nr);
376 cmd.arg1 = g_htonl (arg1);
377 cmd.arg2 = g_htonl (arg2);
378 cmd.data_len = g_htonl (data_len);
379
380 stream->seq_nr++;
381
382 g_string_append_len (stream->output_buffer,
383 (char *)&cmd, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE);
384 }
385
386 static gsize
get_reply_header_missing_bytes(GString * buffer)387 get_reply_header_missing_bytes (GString *buffer)
388 {
389 GVfsDaemonSocketProtocolReply *reply;
390 guint32 type;
391 guint32 arg2;
392
393 if (buffer->len < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE)
394 return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - buffer->len;
395
396 reply = (GVfsDaemonSocketProtocolReply *)buffer->str;
397
398 type = g_ntohl (reply->type);
399 arg2 = g_ntohl (reply->arg2);
400
401 /* ERROR, CLOSED and INFO has extra data w/ len in arg2 */
402 if (type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR ||
403 type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED ||
404 type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_INFO)
405 return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE + arg2 - buffer->len;
406 return 0;
407 }
408
409 static char *
decode_reply(GString * buffer,GVfsDaemonSocketProtocolReply * reply_out)410 decode_reply (GString *buffer, GVfsDaemonSocketProtocolReply *reply_out)
411 {
412 GVfsDaemonSocketProtocolReply *reply;
413 reply = (GVfsDaemonSocketProtocolReply *)buffer->str;
414 reply_out->type = g_ntohl (reply->type);
415 reply_out->seq_nr = g_ntohl (reply->seq_nr);
416 reply_out->arg1 = g_ntohl (reply->arg1);
417 reply_out->arg2 = g_ntohl (reply->arg2);
418
419 return buffer->str + G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE;
420 }
421
422 static void
decode_error(GVfsDaemonSocketProtocolReply * reply,char * data,GError ** error)423 decode_error (GVfsDaemonSocketProtocolReply *reply, char *data, GError **error)
424 {
425 g_set_error_literal (error,
426 g_quark_from_string (data),
427 reply->arg1,
428 data + strlen (data) + 1);
429 }
430
431
432 static gboolean
run_sync_state_machine(GDaemonFileOutputStream * file,state_machine_iterator iterator,gpointer data,GCancellable * cancellable,GError ** error)433 run_sync_state_machine (GDaemonFileOutputStream *file,
434 state_machine_iterator iterator,
435 gpointer data,
436 GCancellable *cancellable,
437 GError **error)
438 {
439 gssize res;
440 StateOp io_op;
441 IOOperationData io_data;
442 GError *io_error;
443
444 memset (&io_data, 0, sizeof (io_data));
445
446 while (TRUE)
447 {
448 if (cancellable)
449 io_data.cancelled = g_cancellable_is_cancelled (cancellable);
450
451 io_op = iterator (file, &io_data, data);
452
453 if (io_op == STATE_OP_DONE)
454 return TRUE;
455
456 io_error = NULL;
457 if (io_op == STATE_OP_READ)
458 {
459 res = g_input_stream_read (file->data_stream,
460 io_data.io_buffer, io_data.io_size,
461 io_data.io_allow_cancel ? cancellable : NULL,
462 &io_error);
463 }
464 else if (io_op == STATE_OP_SKIP)
465 {
466 res = g_input_stream_skip (file->data_stream,
467 io_data.io_size,
468 io_data.io_allow_cancel ? cancellable : NULL,
469 &io_error);
470 }
471 else if (io_op == STATE_OP_WRITE)
472 {
473 res = g_output_stream_write (file->command_stream,
474 io_data.io_buffer, io_data.io_size,
475 io_data.io_allow_cancel ? cancellable : NULL,
476 &io_error);
477 }
478 else
479 {
480 res = 0;
481 g_assert_not_reached ();
482 }
483
484 if (res == -1)
485 {
486 if (error_is_cancel (io_error))
487 {
488 io_data.io_res = 0;
489 io_data.io_cancelled = TRUE;
490 g_error_free (io_error);
491 }
492 else
493 {
494 g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
495 _("Error in stream protocol: %s"), io_error->message);
496 g_error_free (io_error);
497 return FALSE;
498 }
499 }
500 else if (res == 0 && io_data.io_size != 0)
501 {
502 g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
503 _("Error in stream protocol: %s"), _("End of stream"));
504 return FALSE;
505 }
506 else
507 {
508 io_data.io_res = res;
509 io_data.io_cancelled = FALSE;
510 }
511 }
512 }
513
514 /* read cycle:
515
516 if we know of a (partially read) matching outstanding block, read from it
517 create packet, append to outgoing
518 flush outgoing
519 start processing output, looking for a data block with same seek gen,
520 or an error with same seq nr
521 on cancel, send cancel command and go back to loop
522 */
523
524 static StateOp
iterate_write_state_machine(GDaemonFileOutputStream * file,IOOperationData * io_op,WriteOperation * op)525 iterate_write_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, WriteOperation *op)
526 {
527 gsize len;
528
529 while (TRUE)
530 {
531 switch (op->state)
532 {
533 /* Initial state for read op */
534 case WRITE_STATE_INIT:
535 append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_WRITE,
536 op->buffer_size, 0, op->buffer_size, &op->seq_nr);
537 op->state = WRITE_STATE_WROTE_COMMAND;
538 io_op->io_buffer = file->output_buffer->str;
539 io_op->io_size = file->output_buffer->len;
540 io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
541 return STATE_OP_WRITE;
542
543 /* wrote parts of output_buffer */
544 case WRITE_STATE_WROTE_COMMAND:
545 if (io_op->io_cancelled)
546 {
547 if (!op->sent_cancel)
548 unappend_request (file);
549 op->ret_val = -1;
550 g_set_error_literal (&op->ret_error,
551 G_IO_ERROR,
552 G_IO_ERROR_CANCELLED,
553 _("Operation was cancelled"));
554 return STATE_OP_DONE;
555 }
556
557 if (io_op->io_res < file->output_buffer->len)
558 {
559 g_string_remove_in_front (file->output_buffer,
560 io_op->io_res);
561 io_op->io_buffer = file->output_buffer->str;
562 io_op->io_size = file->output_buffer->len;
563 io_op->io_allow_cancel = FALSE;
564 return STATE_OP_WRITE;
565 }
566 g_string_truncate (file->output_buffer, 0);
567
568 op->buffer_pos = 0;
569 if (op->sent_cancel)
570 op->state = WRITE_STATE_HANDLE_INPUT;
571 else
572 op->state = WRITE_STATE_SEND_DATA;
573 break;
574
575 /* No op */
576 case WRITE_STATE_SEND_DATA:
577 op->buffer_pos += io_op->io_res;
578
579 if (op->buffer_pos < op->buffer_size)
580 {
581 io_op->io_buffer = (char *)(op->buffer + op->buffer_pos);
582 io_op->io_size = op->buffer_size - op->buffer_pos;
583 io_op->io_allow_cancel = FALSE;
584 return STATE_OP_WRITE;
585 }
586
587 op->state = WRITE_STATE_HANDLE_INPUT;
588 break;
589
590 /* No op */
591 case WRITE_STATE_HANDLE_INPUT:
592 if (io_op->cancelled && !op->sent_cancel)
593 {
594 op->sent_cancel = TRUE;
595 append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
596 op->seq_nr, 0, 0, NULL);
597 op->state = WRITE_STATE_WROTE_COMMAND;
598 io_op->io_buffer = file->output_buffer->str;
599 io_op->io_size = file->output_buffer->len;
600 io_op->io_allow_cancel = FALSE;
601 return STATE_OP_WRITE;
602 }
603
604 if (io_op->io_res > 0)
605 {
606 gsize unread_size = io_op->io_size - io_op->io_res;
607 g_string_set_size (file->input_buffer,
608 file->input_buffer->len - unread_size);
609 }
610
611 len = get_reply_header_missing_bytes (file->input_buffer);
612 if (len > 0)
613 {
614 gsize current_len = file->input_buffer->len;
615 g_string_set_size (file->input_buffer,
616 current_len + len);
617 io_op->io_buffer = file->input_buffer->str + current_len;
618 io_op->io_size = len;
619 io_op->io_allow_cancel = !op->sent_cancel;
620 return STATE_OP_READ;
621 }
622
623 /* Got full header */
624
625 {
626 GVfsDaemonSocketProtocolReply reply;
627 char *data;
628 data = decode_reply (file->input_buffer, &reply);
629
630 if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
631 reply.seq_nr == op->seq_nr)
632 {
633 op->ret_val = -1;
634 decode_error (&reply, data, &op->ret_error);
635 g_string_truncate (file->input_buffer, 0);
636 return STATE_OP_DONE;
637 }
638 else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_WRITTEN &&
639 reply.seq_nr == op->seq_nr)
640 {
641 op->ret_val = reply.arg1;
642 g_string_truncate (file->input_buffer, 0);
643 return STATE_OP_DONE;
644 }
645 /* Ignore other reply types */
646 }
647
648 g_string_truncate (file->input_buffer, 0);
649
650 /* This wasn't interesting, read next reply */
651 op->state = WRITE_STATE_HANDLE_INPUT;
652 break;
653
654 default:
655 g_assert_not_reached ();
656 }
657
658 /* Clear io_op between non-op state switches */
659 io_op->io_size = 0;
660 io_op->io_res = 0;
661 io_op->io_cancelled = FALSE;
662
663 }
664 }
665
666 static gssize
g_daemon_file_output_stream_write(GOutputStream * stream,const void * buffer,gsize count,GCancellable * cancellable,GError ** error)667 g_daemon_file_output_stream_write (GOutputStream *stream,
668 const void *buffer,
669 gsize count,
670 GCancellable *cancellable,
671 GError **error)
672 {
673 GDaemonFileOutputStream *file;
674 WriteOperation op;
675
676 file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
677
678 if (g_cancellable_set_error_if_cancelled (cancellable, error))
679 return -1;
680
681 /* Limit for sanity and to avoid 32bit overflow */
682 if (count > MAX_WRITE_SIZE)
683 count = MAX_WRITE_SIZE;
684
685 memset (&op, 0, sizeof (op));
686 op.state = WRITE_STATE_INIT;
687 op.buffer = buffer;
688 op.buffer_size = count;
689
690 if (!run_sync_state_machine (file, (state_machine_iterator)iterate_write_state_machine,
691 &op, cancellable, error))
692 return -1; /* IO Error */
693
694 if (op.ret_val == -1)
695 g_propagate_error (error, op.ret_error);
696 else
697 file->current_offset += op.ret_val;
698
699 return op.ret_val;
700 }
701
702 static StateOp
iterate_close_state_machine(GDaemonFileOutputStream * file,IOOperationData * io_op,CloseOperation * op)703 iterate_close_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, CloseOperation *op)
704 {
705 gsize len;
706
707 while (TRUE)
708 {
709 switch (op->state)
710 {
711 /* Initial state for read op */
712 case CLOSE_STATE_INIT:
713 append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE,
714 0, 0, 0, &op->seq_nr);
715 op->state = CLOSE_STATE_WROTE_REQUEST;
716 io_op->io_buffer = file->output_buffer->str;
717 io_op->io_size = file->output_buffer->len;
718 io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
719 return STATE_OP_WRITE;
720
721 /* wrote parts of output_buffer */
722 case CLOSE_STATE_WROTE_REQUEST:
723 if (io_op->io_cancelled)
724 {
725 if (!op->sent_cancel)
726 unappend_request (file);
727 op->ret_val = FALSE;
728 g_set_error_literal (&op->ret_error,
729 G_IO_ERROR,
730 G_IO_ERROR_CANCELLED,
731 _("Operation was cancelled"));
732 return STATE_OP_DONE;
733 }
734
735 if (io_op->io_res < file->output_buffer->len)
736 {
737 g_string_remove_in_front (file->output_buffer,
738 io_op->io_res);
739 io_op->io_buffer = file->output_buffer->str;
740 io_op->io_size = file->output_buffer->len;
741 io_op->io_allow_cancel = FALSE;
742 return STATE_OP_WRITE;
743 }
744 g_string_truncate (file->output_buffer, 0);
745
746 op->state = CLOSE_STATE_HANDLE_INPUT;
747 break;
748
749 /* No op */
750 case CLOSE_STATE_HANDLE_INPUT:
751 if (io_op->cancelled && !op->sent_cancel)
752 {
753 op->sent_cancel = TRUE;
754 append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
755 op->seq_nr, 0, 0, NULL);
756 op->state = CLOSE_STATE_WROTE_REQUEST;
757 io_op->io_buffer = file->output_buffer->str;
758 io_op->io_size = file->output_buffer->len;
759 io_op->io_allow_cancel = FALSE;
760 return STATE_OP_WRITE;
761 }
762
763 if (io_op->io_res > 0)
764 {
765 gsize unread_size = io_op->io_size - io_op->io_res;
766 g_string_set_size (file->input_buffer,
767 file->input_buffer->len - unread_size);
768 }
769
770 len = get_reply_header_missing_bytes (file->input_buffer);
771 if (len > 0)
772 {
773 gsize current_len = file->input_buffer->len;
774 g_string_set_size (file->input_buffer,
775 current_len + len);
776 io_op->io_buffer = file->input_buffer->str + current_len;
777 io_op->io_size = len;
778 io_op->io_allow_cancel = !op->sent_cancel;
779 return STATE_OP_READ;
780 }
781
782 /* Got full header */
783
784 {
785 GVfsDaemonSocketProtocolReply reply;
786 char *data;
787 data = decode_reply (file->input_buffer, &reply);
788
789 if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
790 reply.seq_nr == op->seq_nr)
791 {
792 op->ret_val = FALSE;
793 decode_error (&reply, data, &op->ret_error);
794 g_string_truncate (file->input_buffer, 0);
795 return STATE_OP_DONE;
796 }
797 else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED &&
798 reply.seq_nr == op->seq_nr)
799 {
800 op->ret_val = TRUE;
801 if (reply.arg2 > 0)
802 file->etag = g_strndup (data, reply.arg2);
803 g_string_truncate (file->input_buffer, 0);
804 return STATE_OP_DONE;
805 }
806 /* Ignore other reply types */
807 }
808
809 g_string_truncate (file->input_buffer, 0);
810
811 /* This wasn't interesting, read next reply */
812 op->state = CLOSE_STATE_HANDLE_INPUT;
813 break;
814
815 default:
816 g_assert_not_reached ();
817 }
818
819 /* Clear io_op between non-op state switches */
820 io_op->io_size = 0;
821 io_op->io_res = 0;
822 io_op->io_cancelled = FALSE;
823 }
824 }
825
826
827 static gboolean
g_daemon_file_output_stream_close(GOutputStream * stream,GCancellable * cancellable,GError ** error)828 g_daemon_file_output_stream_close (GOutputStream *stream,
829 GCancellable *cancellable,
830 GError **error)
831 {
832 GDaemonFileOutputStream *file;
833 CloseOperation op;
834 gboolean res;
835
836 file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
837
838 /* We need to do a full roundtrip to guarantee that the writes have
839 reached the disk. */
840
841 memset (&op, 0, sizeof (op));
842 op.state = CLOSE_STATE_INIT;
843
844 if (!run_sync_state_machine (file, (state_machine_iterator)iterate_close_state_machine,
845 &op, cancellable, error))
846 res = FALSE;
847 else
848 {
849 if (!op.ret_val)
850 g_propagate_error (error, op.ret_error);
851 res = op.ret_val;
852 }
853
854 /* Return the first error, but close all streams */
855 if (res)
856 res = g_output_stream_close (file->command_stream, cancellable, error);
857 else
858 g_output_stream_close (file->command_stream, cancellable, NULL);
859
860 if (res)
861 res = g_input_stream_close (file->data_stream, cancellable, error);
862 else
863 g_input_stream_close (file->data_stream, cancellable, NULL);
864
865 return res;
866 }
867
868 static goffset
g_daemon_file_output_stream_tell(GFileOutputStream * stream)869 g_daemon_file_output_stream_tell (GFileOutputStream *stream)
870 {
871 GDaemonFileOutputStream *file;
872
873 file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
874
875 return file->current_offset;
876 }
877
878 static gboolean
g_daemon_file_output_stream_can_seek(GFileOutputStream * stream)879 g_daemon_file_output_stream_can_seek (GFileOutputStream *stream)
880 {
881 GDaemonFileOutputStream *file;
882
883 file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
884
885 return file->can_seek;
886 }
887
888 static StateOp
iterate_seek_state_machine(GDaemonFileOutputStream * file,IOOperationData * io_op,SeekOperation * op)889 iterate_seek_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, SeekOperation *op)
890 {
891 gsize len;
892 guint32 request;
893
894 while (TRUE)
895 {
896 switch (op->state)
897 {
898 /* Initial state for read op */
899 case SEEK_STATE_INIT:
900 request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_SET;
901 if (op->seek_type == G_SEEK_CUR)
902 op->offset = file->current_offset + op->offset;
903 else if (op->seek_type == G_SEEK_END)
904 request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END;
905 append_request (file, request,
906 op->offset & 0xffffffff,
907 op->offset >> 32,
908 0,
909 &op->seq_nr);
910 op->state = SEEK_STATE_WROTE_REQUEST;
911 io_op->io_buffer = file->output_buffer->str;
912 io_op->io_size = file->output_buffer->len;
913 io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
914 return STATE_OP_WRITE;
915
916 /* wrote parts of output_buffer */
917 case SEEK_STATE_WROTE_REQUEST:
918 if (io_op->io_cancelled)
919 {
920 if (!op->sent_cancel)
921 unappend_request (file);
922 op->ret_val = -1;
923 g_set_error_literal (&op->ret_error,
924 G_IO_ERROR,
925 G_IO_ERROR_CANCELLED,
926 _("Operation was cancelled"));
927 return STATE_OP_DONE;
928 }
929
930 if (io_op->io_res < file->output_buffer->len)
931 {
932 g_string_remove_in_front (file->output_buffer,
933 io_op->io_res);
934 io_op->io_buffer = file->output_buffer->str;
935 io_op->io_size = file->output_buffer->len;
936 io_op->io_allow_cancel = FALSE;
937 return STATE_OP_WRITE;
938 }
939 g_string_truncate (file->output_buffer, 0);
940
941 op->state = SEEK_STATE_HANDLE_INPUT;
942 break;
943
944 /* No op */
945 case SEEK_STATE_HANDLE_INPUT:
946 if (io_op->cancelled && !op->sent_cancel)
947 {
948 op->sent_cancel = TRUE;
949 append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
950 op->seq_nr, 0, 0, NULL);
951 op->state = SEEK_STATE_WROTE_REQUEST;
952 io_op->io_buffer = file->output_buffer->str;
953 io_op->io_size = file->output_buffer->len;
954 io_op->io_allow_cancel = FALSE;
955 return STATE_OP_WRITE;
956 }
957
958 if (io_op->io_res > 0)
959 {
960 gsize unread_size = io_op->io_size - io_op->io_res;
961 g_string_set_size (file->input_buffer,
962 file->input_buffer->len - unread_size);
963 }
964
965 len = get_reply_header_missing_bytes (file->input_buffer);
966 if (len > 0)
967 {
968 gsize current_len = file->input_buffer->len;
969 g_string_set_size (file->input_buffer,
970 current_len + len);
971 io_op->io_buffer = file->input_buffer->str + current_len;
972 io_op->io_size = len;
973 io_op->io_allow_cancel = !op->sent_cancel;
974 return STATE_OP_READ;
975 }
976
977 /* Got full header */
978
979 {
980 GVfsDaemonSocketProtocolReply reply;
981 char *data;
982 data = decode_reply (file->input_buffer, &reply);
983
984 if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
985 reply.seq_nr == op->seq_nr)
986 {
987 op->ret_val = FALSE;
988 decode_error (&reply, data, &op->ret_error);
989 g_string_truncate (file->input_buffer, 0);
990 return STATE_OP_DONE;
991 }
992 else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS &&
993 reply.seq_nr == op->seq_nr)
994 {
995 op->ret_val = TRUE;
996 op->ret_offset = ((goffset)reply.arg2) << 32 | (goffset)reply.arg1;
997 g_string_truncate (file->input_buffer, 0);
998 return STATE_OP_DONE;
999 }
1000 /* Ignore other reply types */
1001 }
1002
1003 g_string_truncate (file->input_buffer, 0);
1004
1005 /* This wasn't interesting, read next reply */
1006 op->state = SEEK_STATE_HANDLE_INPUT;
1007 break;
1008
1009 default:
1010 g_assert_not_reached ();
1011 }
1012
1013 /* Clear io_op between non-op state switches */
1014 io_op->io_size = 0;
1015 io_op->io_res = 0;
1016 io_op->io_cancelled = FALSE;
1017 }
1018 }
1019
1020 static gboolean
g_daemon_file_output_stream_seek(GFileOutputStream * stream,goffset offset,GSeekType type,GCancellable * cancellable,GError ** error)1021 g_daemon_file_output_stream_seek (GFileOutputStream *stream,
1022 goffset offset,
1023 GSeekType type,
1024 GCancellable *cancellable,
1025 GError **error)
1026 {
1027 GDaemonFileOutputStream *file;
1028 SeekOperation op;
1029
1030 file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
1031
1032 if (!file->can_seek)
1033 {
1034 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
1035 _("Seek not supported on stream"));
1036 return FALSE;
1037 }
1038
1039 if (g_cancellable_set_error_if_cancelled (cancellable, error))
1040 return FALSE;
1041
1042 memset (&op, 0, sizeof (op));
1043 op.state = SEEK_STATE_INIT;
1044 op.offset = offset;
1045 op.seek_type = type;
1046
1047 if (!run_sync_state_machine (file, (state_machine_iterator)iterate_seek_state_machine,
1048 &op, cancellable, error))
1049 return FALSE; /* IO Error */
1050
1051 if (!op.ret_val)
1052 g_propagate_error (error, op.ret_error);
1053 else
1054 file->current_offset = op.ret_offset;
1055
1056 return op.ret_val;
1057 }
1058
1059 static StateOp
iterate_truncate_state_machine(GDaemonFileOutputStream * file,IOOperationData * io_op,TruncateOperation * op)1060 iterate_truncate_state_machine (GDaemonFileOutputStream *file,
1061 IOOperationData *io_op,
1062 TruncateOperation *op)
1063 {
1064 gsize len;
1065
1066 while (TRUE)
1067 {
1068 switch (op->state)
1069 {
1070 case TRUNCATE_STATE_INIT:
1071 append_request (file,
1072 G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_TRUNCATE,
1073 op->size & 0xffffffff,
1074 op->size >> 32,
1075 0,
1076 &op->seq_nr);
1077 op->state = TRUNCATE_STATE_WROTE_REQUEST;
1078 io_op->io_buffer = file->output_buffer->str;
1079 io_op->io_size = file->output_buffer->len;
1080 io_op->io_allow_cancel = TRUE;
1081 return STATE_OP_WRITE;
1082
1083 case TRUNCATE_STATE_WROTE_REQUEST:
1084 if (io_op->io_cancelled)
1085 {
1086 if (!op->sent_cancel)
1087 unappend_request (file);
1088 op->ret_val = FALSE;
1089 g_set_error_literal (&op->ret_error,
1090 G_IO_ERROR,
1091 G_IO_ERROR_CANCELLED,
1092 _("Operation was cancelled"));
1093 return STATE_OP_DONE;
1094 }
1095
1096 if (io_op->io_res < file->output_buffer->len)
1097 {
1098 g_string_remove_in_front (file->output_buffer, io_op->io_res);
1099 io_op->io_buffer = file->output_buffer->str;
1100 io_op->io_size = file->output_buffer->len;
1101 io_op->io_allow_cancel = FALSE;
1102 return STATE_OP_WRITE;
1103 }
1104 g_string_truncate (file->output_buffer, 0);
1105
1106 op->state = TRUNCATE_STATE_HANDLE_INPUT;
1107 break;
1108
1109 case TRUNCATE_STATE_HANDLE_INPUT:
1110 if (io_op->cancelled && !op->sent_cancel)
1111 {
1112 op->sent_cancel = TRUE;
1113 append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
1114 op->seq_nr, 0, 0, NULL);
1115 op->state = TRUNCATE_STATE_WROTE_REQUEST;
1116 io_op->io_buffer = file->output_buffer->str;
1117 io_op->io_size = file->output_buffer->len;
1118 io_op->io_allow_cancel = FALSE;
1119 return STATE_OP_WRITE;
1120 }
1121
1122 if (io_op->io_res > 0)
1123 {
1124 gsize unread_size = io_op->io_size - io_op->io_res;
1125 g_string_set_size (file->input_buffer,
1126 file->input_buffer->len - unread_size);
1127 }
1128
1129 len = get_reply_header_missing_bytes (file->input_buffer);
1130 if (len > 0)
1131 {
1132 gsize current_len = file->input_buffer->len;
1133 g_string_set_size (file->input_buffer, current_len + len);
1134 io_op->io_buffer = file->input_buffer->str + current_len;
1135 io_op->io_size = len;
1136 io_op->io_allow_cancel = !op->sent_cancel;
1137 return STATE_OP_READ;
1138 }
1139
1140 {
1141 GVfsDaemonSocketProtocolReply reply;
1142 char *data;
1143 data = decode_reply (file->input_buffer, &reply);
1144
1145 if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
1146 reply.seq_nr == op->seq_nr)
1147 {
1148 op->ret_val = FALSE;
1149 decode_error (&reply, data, &op->ret_error);
1150 g_string_truncate (file->input_buffer, 0);
1151 return STATE_OP_DONE;
1152 }
1153 else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_TRUNCATED &&
1154 reply.seq_nr == op->seq_nr)
1155 {
1156 op->ret_val = TRUE;
1157 g_string_truncate (file->input_buffer, 0);
1158 return STATE_OP_DONE;
1159 }
1160 /* Ignore other reply types */
1161 }
1162
1163 g_string_truncate (file->input_buffer, 0);
1164
1165 /* This wasn't interesting, read next reply */
1166 op->state = TRUNCATE_STATE_HANDLE_INPUT;
1167 break;
1168
1169 default:
1170 g_assert_not_reached ();
1171 }
1172
1173 /* Clear io_op between non-op state switches */
1174 io_op->io_size = 0;
1175 io_op->io_res = 0;
1176 io_op->io_cancelled = FALSE;
1177 }
1178 }
1179
1180 static gboolean
g_daemon_file_output_stream_can_truncate(GFileOutputStream * stream)1181 g_daemon_file_output_stream_can_truncate (GFileOutputStream *stream)
1182 {
1183 GDaemonFileOutputStream *file;
1184
1185 file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
1186
1187 return file->can_truncate;
1188 }
1189
1190 static gboolean
g_daemon_file_output_stream_truncate(GFileOutputStream * stream,goffset size,GCancellable * cancellable,GError ** error)1191 g_daemon_file_output_stream_truncate (GFileOutputStream *stream,
1192 goffset size,
1193 GCancellable *cancellable,
1194 GError **error)
1195 {
1196 GDaemonFileOutputStream *file;
1197 TruncateOperation op;
1198
1199 file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
1200
1201 if (!file->can_truncate)
1202 {
1203 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
1204 _("Truncate not supported on stream"));
1205 return FALSE;
1206 }
1207
1208 if (g_cancellable_set_error_if_cancelled (cancellable, error))
1209 return FALSE;
1210
1211 memset (&op, 0, sizeof (op));
1212 op.state = TRUNCATE_STATE_INIT;
1213 op.size = size;
1214
1215 if (!run_sync_state_machine (file, (state_machine_iterator)iterate_truncate_state_machine,
1216 &op, cancellable, error))
1217 return FALSE; /* IO Error */
1218
1219 if (!op.ret_val)
1220 g_propagate_error (error, op.ret_error);
1221
1222 return op.ret_val;
1223 }
1224
1225 static char *
g_daemon_file_output_stream_get_etag(GFileOutputStream * stream)1226 g_daemon_file_output_stream_get_etag (GFileOutputStream *stream)
1227 {
1228 GDaemonFileOutputStream *file;
1229
1230 file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
1231
1232 return g_strdup (file->etag);
1233 }
1234
1235 static StateOp
iterate_query_state_machine(GDaemonFileOutputStream * file,IOOperationData * io_op,QueryOperation * op)1236 iterate_query_state_machine (GDaemonFileOutputStream *file,
1237 IOOperationData *io_op,
1238 QueryOperation *op)
1239 {
1240 gsize len;
1241 guint32 request;
1242
1243 while (TRUE)
1244 {
1245 switch (op->state)
1246 {
1247 /* Initial state for read op */
1248 case QUERY_STATE_INIT:
1249 request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_QUERY_INFO;
1250 append_request (file, request,
1251 0,
1252 0,
1253 strlen (op->attributes),
1254 &op->seq_nr);
1255 g_string_append (file->output_buffer,
1256 op->attributes);
1257
1258 op->state = QUERY_STATE_WROTE_REQUEST;
1259 io_op->io_buffer = file->output_buffer->str;
1260 io_op->io_size = file->output_buffer->len;
1261 io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
1262 return STATE_OP_WRITE;
1263
1264 /* wrote parts of output_buffer */
1265 case QUERY_STATE_WROTE_REQUEST:
1266 if (io_op->io_cancelled)
1267 {
1268 if (!op->sent_cancel)
1269 unappend_request (file);
1270 op->info = NULL;
1271 g_set_error_literal (&op->ret_error,
1272 G_IO_ERROR,
1273 G_IO_ERROR_CANCELLED,
1274 _("Operation was cancelled"));
1275 return STATE_OP_DONE;
1276 }
1277
1278 if (io_op->io_res < file->output_buffer->len)
1279 {
1280 g_string_remove_in_front (file->output_buffer,
1281 io_op->io_res);
1282 io_op->io_buffer = file->output_buffer->str;
1283 io_op->io_size = file->output_buffer->len;
1284 io_op->io_allow_cancel = FALSE;
1285 return STATE_OP_WRITE;
1286 }
1287 g_string_truncate (file->output_buffer, 0);
1288
1289 op->state = QUERY_STATE_HANDLE_INPUT;
1290 break;
1291
1292 /* No op */
1293 case QUERY_STATE_HANDLE_INPUT:
1294 if (io_op->cancelled && !op->sent_cancel)
1295 {
1296 op->sent_cancel = TRUE;
1297 append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
1298 op->seq_nr, 0, 0, NULL);
1299 op->state = QUERY_STATE_WROTE_REQUEST;
1300 io_op->io_buffer = file->output_buffer->str;
1301 io_op->io_size = file->output_buffer->len;
1302 io_op->io_allow_cancel = FALSE;
1303 return STATE_OP_WRITE;
1304 }
1305
1306
1307 if (io_op->io_res > 0)
1308 {
1309 gsize unread_size = io_op->io_size - io_op->io_res;
1310 g_string_set_size (file->input_buffer,
1311 file->input_buffer->len - unread_size);
1312 }
1313
1314 len = get_reply_header_missing_bytes (file->input_buffer);
1315 if (len > 0)
1316 {
1317 gsize current_len = file->input_buffer->len;
1318 g_string_set_size (file->input_buffer,
1319 current_len + len);
1320 io_op->io_buffer = file->input_buffer->str + current_len;
1321 io_op->io_size = len;
1322 io_op->io_allow_cancel = !op->sent_cancel;
1323 return STATE_OP_READ;
1324 }
1325
1326 /* Got full header */
1327
1328 {
1329 GVfsDaemonSocketProtocolReply reply;
1330 char *data;
1331 data = decode_reply (file->input_buffer, &reply);
1332
1333 if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
1334 reply.seq_nr == op->seq_nr)
1335 {
1336 op->info = NULL;
1337 decode_error (&reply, data, &op->ret_error);
1338 g_string_truncate (file->input_buffer, 0);
1339 return STATE_OP_DONE;
1340 }
1341 else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_INFO &&
1342 reply.seq_nr == op->seq_nr)
1343 {
1344 op->info = gvfs_file_info_demarshal (data, reply.arg2);
1345 g_string_truncate (file->input_buffer, 0);
1346 return STATE_OP_DONE;
1347 }
1348 /* Ignore other reply types */
1349 }
1350
1351 g_string_truncate (file->input_buffer, 0);
1352
1353 /* This wasn't interesting, read next reply */
1354 op->state = QUERY_STATE_HANDLE_INPUT;
1355 break;
1356
1357 default:
1358 g_assert_not_reached ();
1359 }
1360
1361 /* Clear io_op between non-op state switches */
1362 io_op->io_size = 0;
1363 io_op->io_res = 0;
1364 io_op->io_cancelled = FALSE;
1365 }
1366 }
1367
1368 static GFileInfo *
g_daemon_file_output_stream_query_info(GFileOutputStream * stream,const char * attributes,GCancellable * cancellable,GError ** error)1369 g_daemon_file_output_stream_query_info (GFileOutputStream *stream,
1370 const char *attributes,
1371 GCancellable *cancellable,
1372 GError **error)
1373 {
1374 GDaemonFileOutputStream *file;
1375 QueryOperation op;
1376
1377 file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
1378
1379 if (g_cancellable_set_error_if_cancelled (cancellable, error))
1380 return NULL;
1381
1382 memset (&op, 0, sizeof (op));
1383 op.state = QUERY_STATE_INIT;
1384 if (attributes)
1385 op.attributes = (char *)attributes;
1386 else
1387 op.attributes = "";
1388
1389 if (!run_sync_state_machine (file, (state_machine_iterator)iterate_query_state_machine,
1390 &op, cancellable, error))
1391 return NULL; /* IO Error */
1392
1393 if (op.info == NULL)
1394 g_propagate_error (error, op.ret_error);
1395
1396 return op.info;
1397 }
1398
1399 /************************************************************************
1400 * Async I/O Code *
1401 ************************************************************************/
1402
1403 typedef struct AsyncIterator AsyncIterator;
1404
1405 typedef void (*AsyncIteratorDone) (GTask *task);
1406
1407 struct AsyncIterator {
1408 AsyncIteratorDone done_cb;
1409 IOOperationData io_data;
1410 state_machine_iterator iterator;
1411 GTask *task;
1412 };
1413
1414 static void async_iterate (AsyncIterator *iterator);
1415
1416 static void
async_op_handle(AsyncIterator * iterator,gssize res,GError * io_error)1417 async_op_handle (AsyncIterator *iterator,
1418 gssize res,
1419 GError *io_error)
1420 {
1421 IOOperationData *io_data = &iterator->io_data;
1422
1423 if (io_error != NULL)
1424 {
1425 if (error_is_cancel (io_error))
1426 {
1427 io_data->io_res = 0;
1428 io_data->io_cancelled = TRUE;
1429 }
1430 else
1431 {
1432 g_task_return_new_error (iterator->task, G_IO_ERROR, G_IO_ERROR_FAILED,
1433 _("Error in stream protocol: %s"), io_error->message);
1434 g_object_unref (iterator->task);
1435 g_free (iterator);
1436 return;
1437 }
1438 }
1439 else if (res == 0 && io_data->io_size != 0)
1440 {
1441 g_task_return_new_error (iterator->task, G_IO_ERROR, G_IO_ERROR_FAILED,
1442 _("Error in stream protocol: %s"), _("End of stream"));
1443 g_object_unref (iterator->task);
1444 g_free (iterator);
1445 return;
1446 }
1447 else
1448 {
1449 io_data->io_res = res;
1450 io_data->io_cancelled = FALSE;
1451 }
1452
1453 async_iterate (iterator);
1454 }
1455
1456 static void
async_read_op_callback(GObject * source_object,GAsyncResult * res,gpointer user_data)1457 async_read_op_callback (GObject *source_object,
1458 GAsyncResult *res,
1459 gpointer user_data)
1460 {
1461 GInputStream *stream = G_INPUT_STREAM (source_object);
1462 gssize count_read;
1463 GError *error = NULL;
1464
1465 count_read = g_input_stream_read_finish (stream, res, &error);
1466
1467 async_op_handle ((AsyncIterator *)user_data, count_read, error);
1468 if (error)
1469 g_error_free (error);
1470 }
1471
1472 static void
async_skip_op_callback(GObject * source_object,GAsyncResult * res,gpointer user_data)1473 async_skip_op_callback (GObject *source_object,
1474 GAsyncResult *res,
1475 gpointer user_data)
1476 {
1477 GInputStream *stream = G_INPUT_STREAM (source_object);
1478 gssize count_skipped;
1479 GError *error = NULL;
1480
1481 count_skipped = g_input_stream_skip_finish (stream, res, &error);
1482
1483 async_op_handle ((AsyncIterator *)user_data, count_skipped, error);
1484 if (error)
1485 g_error_free (error);
1486 }
1487
1488 static void
async_write_op_callback(GObject * source_object,GAsyncResult * res,gpointer user_data)1489 async_write_op_callback (GObject *source_object,
1490 GAsyncResult *res,
1491 gpointer user_data)
1492 {
1493 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1494 gssize bytes_written;
1495 GError *error = NULL;
1496
1497 bytes_written = g_output_stream_write_finish (stream, res, &error);
1498
1499 async_op_handle ((AsyncIterator *)user_data, bytes_written, error);
1500 if (error)
1501 g_error_free (error);
1502 }
1503
1504 static void
async_iterate(AsyncIterator * iterator)1505 async_iterate (AsyncIterator *iterator)
1506 {
1507 IOOperationData *io_data = &iterator->io_data;
1508 GDaemonFileOutputStream *file;
1509 GCancellable *cancellable = g_task_get_cancellable (iterator->task);
1510 StateOp io_op;
1511
1512 io_data->cancelled = g_cancellable_is_cancelled (cancellable);
1513
1514 file = G_DAEMON_FILE_OUTPUT_STREAM (g_task_get_source_object (iterator->task));
1515 io_op = iterator->iterator (file, io_data, g_task_get_task_data (iterator->task));
1516
1517 if (io_op == STATE_OP_DONE)
1518 {
1519 iterator->done_cb (iterator->task);
1520 g_free (iterator);
1521 return;
1522 }
1523
1524 /* TODO: Handle allow_cancel... */
1525
1526 if (io_op == STATE_OP_READ)
1527 {
1528 g_input_stream_read_async (file->data_stream,
1529 io_data->io_buffer, io_data->io_size,
1530 g_task_get_priority (iterator->task),
1531 io_data->io_allow_cancel ? cancellable : NULL,
1532 async_read_op_callback, iterator);
1533 }
1534 else if (io_op == STATE_OP_SKIP)
1535 {
1536 g_input_stream_skip_async (file->data_stream,
1537 io_data->io_size,
1538 g_task_get_priority (iterator->task),
1539 io_data->io_allow_cancel ? cancellable : NULL,
1540 async_skip_op_callback, iterator);
1541 }
1542 else if (io_op == STATE_OP_WRITE)
1543 {
1544 g_output_stream_write_async (file->command_stream,
1545 io_data->io_buffer, io_data->io_size,
1546 g_task_get_priority (iterator->task),
1547 io_data->io_allow_cancel ? cancellable : NULL,
1548 async_write_op_callback, iterator);
1549 }
1550 else
1551 g_assert_not_reached ();
1552 }
1553
1554 static void
run_async_state_machine(GTask * task,state_machine_iterator iterator_cb,AsyncIteratorDone done_cb)1555 run_async_state_machine (GTask *task,
1556 state_machine_iterator iterator_cb,
1557 AsyncIteratorDone done_cb)
1558 {
1559 AsyncIterator *iterator;
1560
1561 iterator = g_new0 (AsyncIterator, 1);
1562 iterator->iterator = iterator_cb;
1563 iterator->done_cb = done_cb;
1564 iterator->task = task;
1565
1566 async_iterate (iterator);
1567 }
1568
1569 static void
async_write_done(GTask * task)1570 async_write_done (GTask *task)
1571 {
1572 WriteOperation *op;
1573 gssize count_written;
1574 GError *error;
1575
1576 op = g_task_get_task_data (task);
1577
1578 count_written = op->ret_val;
1579 error = op->ret_error;
1580
1581 if (count_written == -1)
1582 g_task_return_error (task, error);
1583 else
1584 g_task_return_int (task, count_written);
1585
1586 g_object_unref (task);
1587 }
1588
1589 static void
g_daemon_file_output_stream_write_async(GOutputStream * stream,const void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer data)1590 g_daemon_file_output_stream_write_async (GOutputStream *stream,
1591 const void *buffer,
1592 gsize count,
1593 int io_priority,
1594 GCancellable *cancellable,
1595 GAsyncReadyCallback callback,
1596 gpointer data)
1597 {
1598 WriteOperation *op;
1599 GTask *task;
1600
1601 task = g_task_new (stream, cancellable, callback, data);
1602 g_task_set_priority (task, io_priority);
1603 g_task_set_source_tag (task, g_daemon_file_output_stream_write_async);
1604
1605 /* Limit for sanity and to avoid 32bit overflow */
1606 if (count > MAX_WRITE_SIZE)
1607 count = MAX_WRITE_SIZE;
1608
1609 op = g_new0 (WriteOperation, 1);
1610 op->state = WRITE_STATE_INIT;
1611 op->buffer = buffer;
1612 op->buffer_size = count;
1613
1614 g_task_set_task_data (task, op, g_free);
1615
1616 run_async_state_machine (task,
1617 (state_machine_iterator)iterate_write_state_machine,
1618 async_write_done);
1619 }
1620
1621 static gssize
g_daemon_file_output_stream_write_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)1622 g_daemon_file_output_stream_write_finish (GOutputStream *stream,
1623 GAsyncResult *result,
1624 GError **error)
1625 {
1626 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1627 g_return_val_if_fail (g_async_result_is_tagged (result, g_daemon_file_output_stream_write_async), -1);
1628
1629 return g_task_propagate_int (G_TASK (result), error);
1630 }
1631
1632 static void
async_close_done(GTask * task)1633 async_close_done (GTask *task)
1634 {
1635 GDaemonFileOutputStream *file;
1636 CloseOperation *op;
1637 gboolean result;
1638 GError *error;
1639 GCancellable *cancellable = g_task_get_cancellable (task);
1640
1641 file = G_DAEMON_FILE_OUTPUT_STREAM (g_task_get_source_object (task));
1642 op = g_task_get_task_data (task);
1643
1644 result = op->ret_val;
1645 error = op->ret_error;
1646
1647 if (result)
1648 result = g_output_stream_close (file->command_stream, cancellable, &error);
1649 else
1650 g_output_stream_close (file->command_stream, cancellable, NULL);
1651
1652 if (result)
1653 result = g_input_stream_close (file->data_stream, cancellable, &error);
1654 else
1655 g_input_stream_close (file->data_stream, cancellable, NULL);
1656
1657 if (!result)
1658 g_task_return_error (task, error);
1659 else
1660 g_task_return_boolean (task, TRUE);
1661
1662 g_object_unref (task);
1663 }
1664
1665 static void
g_daemon_file_output_stream_close_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer data)1666 g_daemon_file_output_stream_close_async (GOutputStream *stream,
1667 int io_priority,
1668 GCancellable *cancellable,
1669 GAsyncReadyCallback callback,
1670 gpointer data)
1671 {
1672 CloseOperation *op;
1673 GTask *task;
1674
1675 task = g_task_new (stream, cancellable, callback, data);
1676 g_task_set_priority (task, io_priority);
1677 g_task_set_source_tag (task, g_daemon_file_output_stream_close_async);
1678
1679 op = g_new0 (CloseOperation, 1);
1680 op->state = CLOSE_STATE_INIT;
1681
1682 g_task_set_task_data (task, op, g_free);
1683
1684 run_async_state_machine (task,
1685 (state_machine_iterator)iterate_close_state_machine,
1686 async_close_done);
1687 }
1688
1689 static gboolean
g_daemon_file_output_stream_close_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)1690 g_daemon_file_output_stream_close_finish (GOutputStream *stream,
1691 GAsyncResult *result,
1692 GError **error)
1693 {
1694 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1695 g_return_val_if_fail (g_async_result_is_tagged (result, g_daemon_file_output_stream_close_async), FALSE);
1696
1697 return g_task_propagate_boolean (G_TASK (result), error);
1698 }
1699
1700 static void
async_query_done(GTask * task)1701 async_query_done (GTask *task)
1702 {
1703 QueryOperation *op;
1704 GFileInfo *info;
1705 GError *error;
1706
1707 op = g_task_get_task_data (task);
1708
1709 info = op->info;
1710 error = op->ret_error;
1711
1712 if (info == NULL)
1713 g_task_return_error (task, error);
1714 else
1715 g_task_return_pointer (task, info, g_object_unref);
1716
1717 g_object_unref (task);
1718 }
1719
1720 static void
g_daemon_file_output_stream_query_info_async(GFileOutputStream * stream,const char * attributes,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1721 g_daemon_file_output_stream_query_info_async (GFileOutputStream *stream,
1722 const char *attributes,
1723 int io_priority,
1724 GCancellable *cancellable,
1725 GAsyncReadyCallback callback,
1726 gpointer user_data)
1727 {
1728 QueryOperation *op;
1729 GTask *task;
1730
1731 task = g_task_new (stream, cancellable, callback, user_data);
1732 g_task_set_priority (task, io_priority);
1733 g_task_set_source_tag (task, g_daemon_file_output_stream_query_info_async);
1734
1735 op = g_new0 (QueryOperation, 1);
1736 op->state = QUERY_STATE_INIT;
1737 if (attributes)
1738 op->attributes = g_strdup (attributes);
1739 else
1740 op->attributes = g_strdup ("");
1741
1742 g_task_set_task_data (task, op, (GDestroyNotify)query_operation_free);
1743
1744 run_async_state_machine (task,
1745 (state_machine_iterator)iterate_query_state_machine,
1746 async_query_done);
1747 }
1748
1749 static GFileInfo *
g_daemon_file_output_stream_query_info_finish(GFileOutputStream * stream,GAsyncResult * result,GError ** error)1750 g_daemon_file_output_stream_query_info_finish (GFileOutputStream *stream,
1751 GAsyncResult *result,
1752 GError **error)
1753 {
1754 g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
1755 g_return_val_if_fail (g_async_result_is_tagged (result, g_daemon_file_output_stream_query_info_async), NULL);
1756
1757 return g_task_propagate_pointer (G_TASK (result), error);
1758 }
1759