1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008-2013 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18  *
19  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21  */
22 
23 #include "amanda.h"
24 #include "amxfer.h"
25 
26 /* parent class for XferElement */
27 static GObjectClass *parent_class = NULL;
28 
29 /* parent class for XferDest, XferFilter, and XferSource */
30 static XferElementClass *xfer_element_class = NULL;
31 
32 /***********************
33  * XferElement */
34 
35 static void
xfer_element_init(XferElement * xe)36 xfer_element_init(
37     XferElement *xe)
38 {
39     xe->xfer = NULL;
40     xe->output_mech = XFER_MECH_NONE;
41     xe->input_mech = XFER_MECH_NONE;
42     xe->upstream = xe->downstream = NULL;
43     xe->_input_fd = xe->_output_fd = -1;
44     xe->repr = NULL;
45     xe->must_drain = FALSE;
46 }
47 
48 static gboolean
xfer_element_setup_impl(XferElement * elt G_GNUC_UNUSED)49 xfer_element_setup_impl(
50     XferElement *elt G_GNUC_UNUSED)
51 {
52     return TRUE; /* success */
53 }
54 
55 static gboolean
xfer_element_set_size_impl(XferElement * elt G_GNUC_UNUSED,gint64 size G_GNUC_UNUSED)56 xfer_element_set_size_impl(
57     XferElement *elt G_GNUC_UNUSED,
58     gint64       size G_GNUC_UNUSED)
59 {
60     elt->size = size;
61 
62     return TRUE; /* success */
63 }
64 
65 static gboolean
xfer_element_start_impl(XferElement * elt G_GNUC_UNUSED)66 xfer_element_start_impl(
67     XferElement *elt G_GNUC_UNUSED)
68 {
69     return FALSE; /* will not send XMSG_DONE */
70 }
71 
72 static gboolean
xfer_element_cancel_impl(XferElement * elt,gboolean expect_eof)73 xfer_element_cancel_impl(
74     XferElement *elt,
75     gboolean expect_eof)
76 {
77     elt->cancelled = TRUE;
78     elt->expect_eof = expect_eof;
79     return elt->can_generate_eof;
80 }
81 
82 static gpointer
xfer_element_pull_buffer_impl(XferElement * elt G_GNUC_UNUSED,size_t * size G_GNUC_UNUSED)83 xfer_element_pull_buffer_impl(
84     XferElement *elt G_GNUC_UNUSED,
85     size_t *size G_GNUC_UNUSED)
86 {
87     return NULL;
88 }
89 
90 static void
xfer_element_push_buffer_impl(XferElement * elt G_GNUC_UNUSED,gpointer buf G_GNUC_UNUSED,size_t size G_GNUC_UNUSED)91 xfer_element_push_buffer_impl(
92     XferElement *elt G_GNUC_UNUSED,
93     gpointer buf G_GNUC_UNUSED,
94     size_t size G_GNUC_UNUSED)
95 {
96 }
97 
98 static xfer_element_mech_pair_t *
xfer_element_get_mech_pairs_impl(XferElement * elt)99 xfer_element_get_mech_pairs_impl(
100     XferElement *elt)
101 {
102     return XFER_ELEMENT_GET_CLASS(elt)->mech_pairs;
103 }
104 
105 static char *
xfer_element_repr_impl(XferElement * elt)106 xfer_element_repr_impl(
107     XferElement *elt)
108 {
109     if (!elt->repr) {
110 	elt->repr = newvstrallocf(elt->repr, "<%s@%p>",
111 		G_OBJECT_TYPE_NAME(G_OBJECT(elt)),
112 		elt);
113     }
114 
115     return elt->repr;
116 }
117 
118 static void
xfer_element_finalize(GObject * obj_self)119 xfer_element_finalize(
120     GObject * obj_self)
121 {
122     XferElement *elt = XFER_ELEMENT(obj_self);
123     gint fd;
124 
125     /* free the repr cache */
126     if (elt->repr) g_free(elt->repr);
127 
128     /* close up the input/output file descriptors, being careful to do so
129      * atomically, and making any errors doing so into mere warnings */
130     fd = xfer_element_swap_input_fd(elt, -1);
131     if (fd != -1 && close(fd) != 0)
132 	g_warning("error closing fd %d: %s", fd, strerror(errno));
133     fd = xfer_element_swap_output_fd(elt, -1);
134     if (fd != -1 && close(fd) != 0)
135 	g_warning("error closing fd %d: %s", fd, strerror(errno));
136 
137     /* chain up */
138     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
139 }
140 
141 static void
xfer_element_class_init(XferElementClass * klass)142 xfer_element_class_init(
143     XferElementClass * klass)
144 {
145     GObjectClass *goc = (GObjectClass*) klass;
146 
147     klass->repr = xfer_element_repr_impl;
148     klass->setup = xfer_element_setup_impl;
149     klass->set_size = xfer_element_set_size_impl;
150     klass->start = xfer_element_start_impl;
151     klass->cancel = xfer_element_cancel_impl;
152     klass->pull_buffer = xfer_element_pull_buffer_impl;
153     klass->push_buffer = xfer_element_push_buffer_impl;
154     klass->get_mech_pairs = xfer_element_get_mech_pairs_impl;
155 
156     goc->finalize = xfer_element_finalize;
157 
158     klass->perl_class = NULL;
159 
160     parent_class = g_type_class_peek_parent(klass);
161     xfer_element_class = klass;
162 }
163 
164 GType
xfer_element_get_type(void)165 xfer_element_get_type(void)
166 {
167     static GType type = 0;
168 
169     if G_UNLIKELY(type == 0) {
170         static const GTypeInfo info = {
171             sizeof (XferElementClass),
172             (GBaseInitFunc) NULL,
173             (GBaseFinalizeFunc) NULL,
174             (GClassInitFunc) xfer_element_class_init,
175             (GClassFinalizeFunc) NULL,
176             NULL /* class_data */,
177             sizeof (XferElement),
178             0 /* n_preallocs */,
179             (GInstanceInitFunc) xfer_element_init,
180             NULL
181         };
182 
183         type = g_type_register_static (G_TYPE_OBJECT, "XferElement", &info,
184                                        (GTypeFlags)G_TYPE_FLAG_ABSTRACT);
185     }
186 
187     return type;
188 }
189 
190 /*
191  * Method stubs
192  */
193 
194 void
xfer_element_unref(XferElement * elt)195 xfer_element_unref(
196     XferElement *elt)
197 {
198     if (elt) g_object_unref(elt);
199 }
200 
201 char *
xfer_element_repr(XferElement * elt)202 xfer_element_repr(
203     XferElement *elt)
204 {
205     return XFER_ELEMENT_GET_CLASS(elt)->repr(elt);
206 }
207 
208 gboolean
xfer_element_setup(XferElement * elt)209 xfer_element_setup(
210     XferElement *elt)
211 {
212     return XFER_ELEMENT_GET_CLASS(elt)->setup(elt);
213 }
214 
215 gboolean
xfer_element_set_size(XferElement * elt,gint64 size)216 xfer_element_set_size(
217     XferElement *elt,
218     gint64       size)
219 {
220     return XFER_ELEMENT_GET_CLASS(elt)->set_size(elt, size);
221 }
222 
223 gboolean
xfer_element_start(XferElement * elt)224 xfer_element_start(
225     XferElement *elt)
226 {
227     return XFER_ELEMENT_GET_CLASS(elt)->start(elt);
228 }
229 
230 gboolean
xfer_element_cancel(XferElement * elt,gboolean expect_eof)231 xfer_element_cancel(
232     XferElement *elt,
233     gboolean expect_eof)
234 {
235     return XFER_ELEMENT_GET_CLASS(elt)->cancel(elt, expect_eof);
236 }
237 
238 gpointer
xfer_element_pull_buffer(XferElement * elt,size_t * size)239 xfer_element_pull_buffer(
240     XferElement *elt,
241     size_t *size)
242 {
243     xfer_status status;
244     /* Make sure that the xfer is running before calling upstream's
245      * pull_buffer method; this avoids a race condition where upstream
246      * hasn't finished its xfer_element_start yet, and isn't ready for
247      * a pull */
248     g_mutex_lock(elt->xfer->status_mutex);
249     status = elt->xfer->status;
250     g_mutex_unlock(elt->xfer->status_mutex);
251     if (status == XFER_START)
252 	wait_until_xfer_running(elt->xfer);
253 
254     return XFER_ELEMENT_GET_CLASS(elt)->pull_buffer(elt, size);
255 }
256 
257 void
xfer_element_push_buffer(XferElement * elt,gpointer buf,size_t size)258 xfer_element_push_buffer(
259     XferElement *elt,
260     gpointer buf,
261     size_t size)
262 {
263     /* There is no race condition with push_buffer, because downstream
264      * elements are started first. */
265     XFER_ELEMENT_GET_CLASS(elt)->push_buffer(elt, buf, size);
266 }
267 
268 xfer_element_mech_pair_t *
xfer_element_get_mech_pairs(XferElement * elt)269 xfer_element_get_mech_pairs(
270 	XferElement *elt)
271 {
272     return XFER_ELEMENT_GET_CLASS(elt)->get_mech_pairs(elt);
273 }
274 
275 /****
276  * Utilities
277  */
278 
279 void
xfer_element_drain_buffers(XferElement * upstream)280 xfer_element_drain_buffers(
281     XferElement *upstream)
282 {
283     gpointer buf;
284     size_t size;
285 
286     while ((buf =xfer_element_pull_buffer(upstream, &size))) {
287 	amfree(buf);
288     }
289 }
290 
291 void
xfer_element_drain_fd(int fd)292 xfer_element_drain_fd(
293     int fd)
294 {
295     size_t len;
296     char buf[1024];
297 
298     while (1) {
299 	len = full_read(fd, buf, sizeof(buf));
300 	if (len < sizeof(buf))
301 	    return;
302     }
303 }
304 
305