1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008-2013 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18  *
19  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21  */
22 
23 #include "amanda.h"
24 #include "amxfer.h"
25 #include "element-glue.h"
26 #include "directtcp.h"
27 #include "util.h"
28 #include "sockaddr-util.h"
29 #include "stream.h"
30 #include "debug.h"
31 
32 /*
33  * Instance definition
34  */
35 
36 typedef struct XferElementGlue_ {
37     XferElement __parent__;
38 
39     /* instructions to push_buffer_impl */
40     enum {
41 	PUSH_TO_RING_BUFFER,
42 	PUSH_TO_FD, /* write to write_fd */
43 	PUSH_INVALID,
44 
45 	PUSH_ACCEPT_FIRST = (1 << 16),
46 	PUSH_CONNECT_FIRST = (2 << 16),
47     } on_push;
48 
49     /* instructions to pull_buffer_impl */
50     enum {
51 	PULL_FROM_RING_BUFFER,
52 	PULL_FROM_FD, /* read from read_fd */
53 	PULL_INVALID,
54 
55 	PULL_ACCEPT_FIRST = (1 << 16),
56 	PULL_CONNECT_FIRST = (2 << 16),
57     } on_pull;
58 
59     int *write_fdp;
60     int *read_fdp;
61 
62     gboolean need_thread;
63 
64     /* the stuff we might use, depending on what flavor of glue we're
65      * providing.. */
66     int pipe[2];
67     int input_listen_socket, output_listen_socket;
68     int input_data_socket, output_data_socket;
69     int read_fd, write_fd;
70 
71     /* a ring buffer of ptr/size pairs with semaphores */
72     struct { gpointer buf; size_t size; } *ring;
73     amsemaphore_t *ring_used_sem, *ring_free_sem;
74     gint ring_head, ring_tail;
75 
76     GThread *thread;
77     GThreadFunc threadfunc;
78 } XferElementGlue;
79 
80 /*
81  * Class definition
82  */
83 
84 typedef struct XferElementGlueClass_ {
85     XferElementClass __parent__;
86 } XferElementGlueClass;
87 
88 static GObjectClass *parent_class = NULL;
89 
90 /*
91  * Utility functions, etc.
92  */
93 
94 static void
make_pipe(XferElementGlue * self)95 make_pipe(
96     XferElementGlue *self)
97 {
98     if (pipe(self->pipe) < 0)
99 	g_critical(_("Could not create pipe: %s"), strerror(errno));
100 }
101 
102 static void
send_xfer_done(XferElementGlue * self)103 send_xfer_done(
104     XferElementGlue *self)
105 {
106     xfer_queue_message(XFER_ELEMENT(self)->xfer,
107 	    xmsg_new((XferElement *)self, XMSG_DONE, 0));
108 }
109 
110 static gboolean
do_directtcp_listen(XferElement * elt,int * sockp,DirectTCPAddr ** addrsp)111 do_directtcp_listen(
112     XferElement *elt,
113     int *sockp,
114     DirectTCPAddr **addrsp)
115 {
116     int sock;
117     sockaddr_union data_addr;
118     DirectTCPAddr *addrs;
119     socklen_t len;
120     struct addrinfo *res;
121     struct addrinfo *res_addr;
122     sockaddr_union *addr = NULL;
123 
124     if (resolve_hostname("localhost", 0, &res, NULL) != 0) {
125 	xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno));
126 	return FALSE;
127     }
128     for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
129 	if (res_addr->ai_family == AF_INET) {
130             addr = (sockaddr_union *)res_addr->ai_addr;
131             break;
132         }
133     }
134     if (!addr) {
135         addr = (sockaddr_union *)res->ai_addr;
136     }
137 
138     sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
139     if (sock < 0) {
140 	xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
141 	return FALSE;
142     }
143 
144     len = SS_LEN(addr);
145     if (bind(sock, (struct sockaddr *)addr, len) != 0) {
146 	xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
147 	freeaddrinfo(res);
148 	return FALSE;
149     }
150 
151     if (listen(sock, 1) < 0) {
152 	xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
153 	return FALSE;
154     }
155 
156     /* TODO: which addresses should this display? all ifaces? localhost? */
157     len = sizeof(data_addr);
158     if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
159 	error("getsockname(): %s", strerror(errno));
160 
161     addrs = g_new0(DirectTCPAddr, 2);
162     copy_sockaddr(&addrs[0], &data_addr);
163     *addrsp = addrs;
164 
165     return TRUE;
166 }
167 
168 static gboolean
prolong_accept(gpointer data)169 prolong_accept(
170     gpointer data)
171 {
172     return !XFER_ELEMENT(data)->cancelled;
173 }
174 
175 static int
do_directtcp_accept(XferElementGlue * self,int * socketp)176 do_directtcp_accept(
177     XferElementGlue *self,
178     int *socketp)
179 {
180     int sock;
181     time_t timeout_time = time(NULL) + 60;
182     g_assert(*socketp != -1);
183 
184     if ((sock = interruptible_accept(*socketp, NULL, NULL,
185 				     prolong_accept, self, timeout_time)) == -1) {
186 	/* if the accept was interrupted due to a cancellation, then do not
187 	 * add a further error message */
188 	if (errno == 0 && XFER_ELEMENT(self)->cancelled)
189 	    return -1;
190 
191 	xfer_cancel_with_error(XFER_ELEMENT(self),
192 	    _("Error accepting incoming connection: %s"), strerror(errno));
193 	wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
194 	return -1;
195     }
196 
197     /* close the listening socket now, for good measure */
198     close(*socketp);
199     *socketp = -1;
200 
201     g_debug("do_directtcp_accept: %d", sock);
202 
203     return sock;
204 }
205 
206 static int
do_directtcp_connect(XferElementGlue * self,DirectTCPAddr * addrs)207 do_directtcp_connect(
208     XferElementGlue *self,
209     DirectTCPAddr *addrs)
210 {
211     XferElement *elt = XFER_ELEMENT(self);
212     sockaddr_union addr;
213     int sock;
214 #ifdef WORKING_IPV6
215     char strsockaddr[INET6_ADDRSTRLEN + 20];
216 #else
217     char strsockaddr[INET_ADDRSTRLEN + 20];
218 #endif
219 
220     if (!addrs) {
221 	g_debug("element-glue got no directtcp addresses to connect to!");
222 	if (!elt->cancelled) {
223 	    xfer_cancel_with_error(elt,
224 		"%s got no directtcp addresses to connect to",
225 		xfer_element_repr(elt));
226 	}
227 	goto cancel_wait;
228     }
229 
230     /* set up the sockaddr -- IPv4 only */
231     copy_sockaddr(&addr, addrs);
232 
233     str_sockaddr_r(&addr, strsockaddr, sizeof(strsockaddr));
234 
235     if (strncmp(strsockaddr,"255.255.255.255:", 16) == 0) {
236 	char  buffer[32770];
237 	char *s;
238 	int   size;
239 	char *data_host;
240 	int   data_port;
241 
242 	g_debug("do_directtcp_connect making indirect data connection to %s",
243 		strsockaddr);
244 	data_port = SU_GET_PORT(&addr);
245 	sock = stream_client("localhost", data_port,
246                              STREAM_BUFSIZE, 0, NULL, 0);
247 	if (sock < 0) {
248 	    xfer_cancel_with_error(elt, "stream_client(): %s", strerror(errno));
249 	    goto cancel_wait;
250 	}
251 	size = full_read(sock, buffer, 32768);
252 	if (size < 0 ) {
253 	    xfer_cancel_with_error(elt, "failed to read from indirecttcp: %s",
254 				   strerror(errno));
255 	    goto cancel_wait;
256 	}
257 	close(sock);
258 	buffer[size++] = ' ';
259 	buffer[size] = '\0';
260 	if ((s = strchr(buffer, ':')) == NULL) {
261 	    xfer_cancel_with_error(elt,
262 				   "Failed to parse indirect data stream: %s",
263 				   buffer);
264 	    goto cancel_wait;
265 	}
266 	*s++ = '\0';
267 	data_host = buffer;
268 	data_port = atoi(s);
269 
270 	str_to_sockaddr(data_host, &addr);
271 	SU_SET_PORT(&addr, data_port);
272 
273 	str_sockaddr_r(&addr, strsockaddr, sizeof(strsockaddr));
274     }
275 
276     sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
277 
278     g_debug("do_directtcp_connect making data connection to %s", strsockaddr);
279 
280     if (sock < 0) {
281 	xfer_cancel_with_error(elt,
282 	    "socket(): %s", strerror(errno));
283 	goto cancel_wait;
284     }
285     if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
286 	xfer_cancel_with_error(elt,
287 	    "connect(): %s", strerror(errno));
288 	goto cancel_wait;
289     }
290 
291     g_debug("do_directtcp_connect: connected to %s, fd %d", strsockaddr, sock);
292 
293     return sock;
294 
295 cancel_wait:
296     wait_until_xfer_cancelled(elt->xfer);
297     return -1;
298 }
299 
300 #define GLUE_BUFFER_SIZE 32768
301 #define GLUE_RING_BUFFER_SIZE 32
302 
303 #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
304 
305 /*
306  * fd handling
307  */
308 
309 /* if self->read_fdp or self->write_fdp are pointing to this integer, then they
310  * should be redirected to point to the upstream's output_fd or downstream's
311  * input_fd, respectively, at the first call to get_read_fd or get_write_fd,
312  * respectively. */
313 static int neighboring_element_fd = -1;
314 
315 #define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
316 static int
_get_read_fd(XferElementGlue * self)317 _get_read_fd(XferElementGlue *self)
318 {
319     if (!self->read_fdp)
320 	return -1; /* shouldn't happen.. */
321 
322     if (self->read_fdp == &neighboring_element_fd) {
323 	XferElement *elt = XFER_ELEMENT(self);
324 	self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
325     } else {
326 	self->read_fd = *self->read_fdp;
327 	*self->read_fdp = -1;
328     }
329     self->read_fdp = NULL;
330     return self->read_fd;
331 }
332 
333 #define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
334 static int
_get_write_fd(XferElementGlue * self)335 _get_write_fd(XferElementGlue *self)
336 {
337     if (!self->write_fdp)
338 	return -1; /* shouldn't happen.. */
339 
340     if (self->write_fdp == &neighboring_element_fd) {
341 	XferElement *elt = XFER_ELEMENT(self);
342 	self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
343     } else {
344 	self->write_fd = *self->write_fdp;
345 	*self->write_fdp = -1;
346     }
347     self->write_fdp = NULL;
348     return self->write_fd;
349 }
350 
351 static int
close_read_fd(XferElementGlue * self)352 close_read_fd(XferElementGlue *self)
353 {
354     int fd = get_read_fd(self);
355     self->read_fd = -1;
356     return close(fd);
357 }
358 
359 static int
close_write_fd(XferElementGlue * self)360 close_write_fd(XferElementGlue *self)
361 {
362     int fd = get_write_fd(self);
363     self->write_fd = -1;
364     return close(fd);
365 }
366 
367 /*
368  * Worker thread utility functions
369  */
370 
371 static void
pull_and_write(XferElementGlue * self)372 pull_and_write(XferElementGlue *self)
373 {
374     XferElement *elt = XFER_ELEMENT(self);
375     int fd = get_write_fd(self);
376     self->write_fdp = NULL;
377     crc32_init(&elt->crc);
378 
379     while (!elt->cancelled) {
380 	size_t len;
381 	char *buf;
382 
383 	/* get a buffer from upstream */
384 	buf = xfer_element_pull_buffer(elt->upstream, &len);
385 	if (!buf)
386 	    break;
387 
388 	/* write it */
389 	if (!elt->downstream->drain_mode && full_write(fd, buf, len) < len) {
390 	    if (elt->downstream->must_drain) {
391 		g_debug("Error writing to fd %d: %s", fd, strerror(errno));
392 	    } else if (elt->downstream->ignore_broken_pipe && errno == EPIPE) {
393 	    } else {
394 		if (!elt->cancelled) {
395 		    xfer_cancel_with_error(elt,
396 			_("Error writing to fd %d: %s"), fd, strerror(errno));
397 		    xfer_cancel(elt->xfer);
398 		    wait_until_xfer_cancelled(elt->xfer);
399 		}
400 		amfree(buf);
401 		break;
402 	    }
403 	    elt->downstream->drain_mode = TRUE;
404 	}
405 	crc32_add((uint8_t *)buf, len, &elt->crc);
406 
407 	amfree(buf);
408     }
409 
410     if (elt->cancelled && elt->expect_eof)
411 	xfer_element_drain_buffers(elt->upstream);
412 
413     g_debug("xfer-dest-fd CRC: %08x:%lld",
414 	    crc32_finish(&elt->crc), (long long)elt->crc.size);
415 
416     /* close the fd we've been writing, as an EOF signal to downstream, and
417      * set it to -1 to avoid accidental re-use */
418     close_write_fd(self);
419 }
420 
421 static void
read_and_write(XferElementGlue * self)422 read_and_write(XferElementGlue *self)
423 {
424     XferElement *elt = XFER_ELEMENT(self);
425     /* dynamically allocate a buffer, in case this thread has
426      * a limited amount of stack allocated */
427     char *buf = g_malloc(GLUE_BUFFER_SIZE);
428     int rfd = get_read_fd(self);
429     int wfd = get_write_fd(self);
430 
431     g_debug("read_and_write: read from %d, write to %d", rfd, wfd);
432     while (!elt->cancelled) {
433 	size_t len;
434 
435 	/* read from upstream */
436 	len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
437 	if (len < GLUE_BUFFER_SIZE) {
438 	    if (errno) {
439 		if (!elt->cancelled) {
440 		    xfer_cancel_with_error(elt,
441 			_("Error reading from fd %d: %s"), rfd, strerror(errno));
442 		    wait_until_xfer_cancelled(elt->xfer);
443 		}
444 		break;
445 	    } else if (len == 0) { /* we only count a zero-length read as EOF */
446 		break;
447 	    }
448 	}
449 
450 	/* write the buffer fully */
451 	if (!elt->downstream->drain_mode && full_write(wfd, buf, len) < len) {
452 	    if (elt->downstream->must_drain) {
453 		g_debug("Could not write to fd %d: %s",  wfd, strerror(errno));
454 	    } else if (elt->downstream->ignore_broken_pipe && errno == EPIPE) {
455 	    } else {
456 		if (!elt->cancelled) {
457 		    xfer_cancel_with_error(elt,
458 			_("Could not write to fd %d: %s"),
459 			wfd, strerror(errno));
460 		    wait_until_xfer_cancelled(elt->xfer);
461 		}
462 		break;
463 	    }
464 	}
465     }
466 
467     if (elt->cancelled && elt->expect_eof)
468 	xfer_element_drain_fd(rfd);
469 
470     /* close the read fd.  If it's not at EOF, then upstream will get EPIPE, which will hopefully
471      * kill it and complete the cancellation */
472     close_read_fd(self);
473 
474     /* close the fd we've been writing, as an EOF signal to downstream */
475     close_write_fd(self);
476 
477     amfree(buf);
478 }
479 
480 static void
read_and_push(XferElementGlue * self)481 read_and_push(
482     XferElementGlue *self)
483 {
484     XferElement *elt = XFER_ELEMENT(self);
485     int fd = get_read_fd(self);
486 
487     crc32_init(&elt->crc);
488     while (!elt->cancelled) {
489 	char *buf = g_malloc(GLUE_BUFFER_SIZE);
490 	size_t len;
491 
492 	/* read a buffer from upstream */
493 	len = full_read(fd, buf, GLUE_BUFFER_SIZE);
494 	if (len < GLUE_BUFFER_SIZE) {
495 	    if (errno) {
496 		if (!elt->cancelled) {
497 		    int saved_errno = errno;
498 		    xfer_cancel_with_error(elt,
499 			_("Error reading from fd %d: %s"), fd, strerror(saved_errno));
500 		    g_debug("element-glue: error reading from fd %d: %s",
501 			    fd, strerror(saved_errno));
502 		    wait_until_xfer_cancelled(elt->xfer);
503 		}
504                 amfree(buf);
505 		break;
506 	    } else if (len == 0) { /* we only count a zero-length read as EOF */
507 		amfree(buf);
508 		break;
509 	    }
510 	}
511 
512 	crc32_add((uint8_t *)buf, len, &elt->crc);
513 	xfer_element_push_buffer(elt->downstream, buf, len);
514     }
515 
516     if (elt->cancelled && elt->expect_eof)
517 	xfer_element_drain_fd(fd);
518 
519     /* send an EOF indication downstream */
520     xfer_element_push_buffer(elt->downstream, NULL, 0);
521 
522     /* close the read fd, since it's at EOF */
523     close_read_fd(self);
524 
525     g_debug("xfer-source-fd CRC: %08x:%lld",
526 	    crc32_finish(&elt->crc), (long long)elt->crc.size);
527 }
528 
529 static void
pull_and_push(XferElementGlue * self)530 pull_and_push(XferElementGlue *self)
531 {
532     XferElement *elt = XFER_ELEMENT(self);
533     gboolean eof_sent = FALSE;
534 
535     while (!elt->cancelled) {
536 	char *buf;
537 	size_t len;
538 
539 	/* get a buffer from upstream */
540 	buf = xfer_element_pull_buffer(elt->upstream, &len);
541 
542 	/* and push it downstream */
543 	xfer_element_push_buffer(elt->downstream, buf, len);
544 
545 	if (!buf) {
546 	    eof_sent = TRUE;
547 	    break;
548 	}
549     }
550 
551     if (elt->cancelled && elt->expect_eof)
552 	xfer_element_drain_buffers(elt->upstream);
553 
554     if (!eof_sent)
555 	xfer_element_push_buffer(elt->downstream, NULL, 0);
556 }
557 
558 static gpointer
worker_thread(gpointer data)559 worker_thread(
560     gpointer data)
561 {
562     XferElement *elt = XFER_ELEMENT(data);
563     XferElementGlue *self = XFER_ELEMENT_GLUE(data);
564 
565     switch (mech_pair(elt->input_mech, elt->output_mech)) {
566     case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
567 	read_and_write(self);
568 	break;
569 
570     case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
571     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
572 	read_and_push(self);
573 	break;
574 
575     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
576     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
577 	pull_and_write(self);
578 	break;
579 
580     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
581 	pull_and_push(self);
582 	break;
583 
584     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
585     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
586 	if ((self->output_data_socket = do_directtcp_connect(self,
587 				    elt->downstream->input_listen_addrs)) == -1)
588 	    break;
589 	self->write_fdp = &self->output_data_socket;
590 	read_and_write(self);
591 	break;
592 
593     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
594 	if ((self->output_data_socket = do_directtcp_connect(self,
595 				    elt->downstream->input_listen_addrs)) == -1)
596 	    break;
597 	self->write_fdp = &self->output_data_socket;
598 	pull_and_write(self);
599 	break;
600 
601     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
602     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
603 	if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
604 	    break;
605 	self->read_fdp = &self->input_data_socket;
606 	read_and_write(self);
607 	break;
608 
609     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
610 	if ((self->input_data_socket = do_directtcp_accept(self,
611 					    &self->input_listen_socket)) == -1)
612 	    break;
613 	self->read_fdp = &self->input_data_socket;
614 	read_and_push(self);
615 	break;
616 
617     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
618     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
619     case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
620     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
621     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
622     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
623     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
624     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
625     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
626     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
627     default:
628 	g_assert_not_reached();
629 	break;
630 
631     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
632     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
633 	if ((self->output_data_socket = do_directtcp_accept(self,
634 					    &self->output_listen_socket)) == -1)
635 	    break;
636 	self->write_fdp = &self->output_data_socket;
637 	read_and_write(self);
638 	break;
639 
640     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
641     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
642 	if ((self->input_data_socket = do_directtcp_connect(self,
643 				    elt->upstream->output_listen_addrs)) == -1)
644 	    break;
645 	self->read_fdp = &self->input_data_socket;
646 	read_and_write(self);
647 	break;
648 
649     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
650 	if ((self->input_data_socket = do_directtcp_connect(self,
651 				    elt->upstream->output_listen_addrs)) == -1)
652 	    break;
653 	self->read_fdp = &self->input_data_socket;
654 	read_and_push(self);
655 	break;
656 
657     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
658 	if ((self->output_data_socket = do_directtcp_accept(self,
659 					    &self->output_listen_socket)) == -1)
660 	    break;
661 	self->write_fdp = &self->output_data_socket;
662 	pull_and_write(self);
663 	break;
664 
665     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
666 	/* TODO: use async accept's here to avoid order dependency */
667 	if ((self->output_data_socket = do_directtcp_accept(self,
668 					    &self->output_listen_socket)) == -1)
669 	    break;
670 	self->write_fdp = &self->output_data_socket;
671 	if ((self->input_data_socket = do_directtcp_accept(self,
672 					    &self->input_listen_socket)) == -1)
673 	    break;
674 	self->read_fdp = &self->input_data_socket;
675 	read_and_write(self);
676 	break;
677 
678     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
679 	/* TODO: use async connects and select() to avoid order dependency here */
680 	if ((self->input_data_socket = do_directtcp_connect(self,
681 				    elt->upstream->output_listen_addrs)) == -1)
682 	    break;
683 	self->read_fdp = &self->input_data_socket;
684 	if ((self->output_data_socket = do_directtcp_connect(self,
685 				    elt->downstream->input_listen_addrs)) == -1)
686 	    break;
687 	self->write_fdp = &self->output_data_socket;
688 	read_and_write(self);
689 	break;
690     }
691 
692     send_xfer_done(self);
693 
694     return NULL;
695 }
696 
697 /*
698  * Implementation
699  */
700 
701 static gboolean
setup_impl(XferElement * elt)702 setup_impl(
703     XferElement *elt)
704 {
705     XferElementGlue *self = (XferElementGlue *)elt;
706     gboolean need_ring = FALSE;
707     gboolean need_listen_input = FALSE;
708     gboolean need_listen_output = FALSE;
709 
710     g_assert(elt->input_mech != XFER_MECH_NONE);
711     g_assert(elt->output_mech != XFER_MECH_NONE);
712     g_assert(elt->input_mech != elt->output_mech);
713 
714     self->read_fdp = NULL;
715     self->write_fdp = NULL;
716     self->on_push = PUSH_INVALID;
717     self->on_pull = PULL_INVALID;
718     self->need_thread = FALSE;
719 
720     switch (mech_pair(elt->input_mech, elt->output_mech)) {
721     case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
722 	/* thread will read from one fd and write to the other */
723 	self->read_fdp = &neighboring_element_fd;
724 	self->write_fdp = &neighboring_element_fd;
725 	self->need_thread = TRUE;
726 	break;
727 
728     case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
729 	/* thread will read from one fd and call push_buffer downstream */
730 	self->read_fdp = &neighboring_element_fd;
731 	self->need_thread = TRUE;
732 	break;
733 
734     case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
735 	self->read_fdp = &neighboring_element_fd;
736 	self->on_pull = PULL_FROM_FD;
737 	break;
738 
739     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
740 	/* thread will connect for output, then read from fd and write to the
741 	 * socket. */
742 	self->read_fdp = &neighboring_element_fd;
743 	self->need_thread = TRUE;
744 	break;
745 
746     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
747 	/* thread will accept output conn, then read from upstream and write to socket */
748 	self->read_fdp = &neighboring_element_fd;
749 	self->need_thread = TRUE;
750 	need_listen_output = TRUE;
751 	break;
752 
753     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
754 	make_pipe(self);
755 	g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
756 	self->pipe[1] = -1; /* upstream will close this for us */
757 	g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
758 	self->pipe[0] = -1; /* downstream will close this for us */
759 	break;
760 
761     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
762 	/* thread will read from pipe and call downstream's push_buffer */
763 	make_pipe(self);
764 	g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
765 	self->pipe[1] = -1; /* upstream will close this for us */
766 	self->read_fdp = &self->pipe[0];
767 	self->need_thread = TRUE;
768 	break;
769 
770     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
771 	make_pipe(self);
772 	g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
773 	self->pipe[1] = -1; /* upstream will close this for us */
774 	self->on_pull = PULL_FROM_FD;
775 	self->read_fdp = &self->pipe[0];
776 	break;
777 
778     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
779 	/* thread will connect for output, then read from pipe and write to socket */
780 	make_pipe(self);
781 	g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
782 	self->pipe[1] = -1; /* upstream will close this for us */
783 	self->read_fdp = &self->pipe[0];
784 	self->need_thread = TRUE;
785 	break;
786 
787     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
788 	/* thread will accept output conn, then read from pipe and write to socket */
789 	make_pipe(self);
790 	g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
791 	self->pipe[1] = -1; /* upstream will close this for us */
792 	self->read_fdp = &self->pipe[0];
793 	self->need_thread = TRUE;
794 	need_listen_output = TRUE;
795 	break;
796 
797     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
798 	make_pipe(self);
799 	g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
800 	self->pipe[0] = -1; /* downstream will close this for us */
801 	self->on_push = PUSH_TO_FD;
802 	self->write_fdp = &self->pipe[1];
803 	break;
804 
805     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
806 	self->on_push = PUSH_TO_FD;
807 	self->write_fdp = &neighboring_element_fd;
808 	break;
809 
810     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
811 	self->on_push = PUSH_TO_RING_BUFFER;
812 	self->on_pull = PULL_FROM_RING_BUFFER;
813 	need_ring = TRUE;
814 	break;
815 
816     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
817 	/* push will connect for output first */
818 	self->on_push = PUSH_TO_FD | PUSH_CONNECT_FIRST;
819 	break;
820 
821     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
822 	/* push will accept for output first */
823 	self->on_push = PUSH_TO_FD | PUSH_ACCEPT_FIRST;
824 	need_listen_output = TRUE;
825 	break;
826 
827     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
828 	/* thread will pull from upstream and write to pipe */
829 	make_pipe(self);
830 	g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
831 	self->pipe[0] = -1; /* downstream will close this for us */
832 	self->write_fdp = &self->pipe[1];
833 	self->need_thread = TRUE;
834 	break;
835 
836     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
837 	/* thread will pull from upstream and write to downstream */
838 	self->write_fdp = &neighboring_element_fd;
839 	self->need_thread = TRUE;
840 	break;
841 
842     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
843 	/* thread will pull from upstream and push to downstream */
844 	self->need_thread = TRUE;
845 	break;
846 
847     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
848 	/* thread will connect for output, then pull from upstream and write to socket */
849 	self->need_thread = TRUE;
850 	break;
851 
852     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
853 	/* thread will accept for output, then pull from upstream and write to socket */
854 	self->need_thread = TRUE;
855 	need_listen_output = TRUE;
856 	break;
857 
858     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
859 	/* thread will accept for input, then read from socket and write to pipe */
860 	make_pipe(self);
861 	g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
862 	self->pipe[0] = -1; /* downstream will close this for us */
863 	self->write_fdp = &self->pipe[1];
864 	self->need_thread = TRUE;
865 	need_listen_input = TRUE;
866 	break;
867 
868     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
869 	/* thread will accept for input, then read from socket and write to downstream */
870 	self->write_fdp = &neighboring_element_fd;
871 	self->need_thread = TRUE;
872 	need_listen_input = TRUE;
873 	break;
874 
875     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
876 	/* thread will accept for input, then read from socket and push downstream */
877 	self->need_thread = TRUE;
878 	need_listen_input = TRUE;
879 	break;
880 
881     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
882 	/* first pull will accept for input, then read from socket */
883 	self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
884 	need_listen_input = TRUE;
885 	break;
886 
887     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
888 	/* thread will accept on both sides, then copy from socket to socket */
889 	self->need_thread = TRUE;
890 	need_listen_input = TRUE;
891 	need_listen_output = TRUE;
892 	break;
893 
894     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
895 	/* thread will connect for input, then read from socket and write to pipe */
896 	make_pipe(self);
897 	g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
898 	self->pipe[0] = -1; /* downstream will close this for us */
899 	self->write_fdp = &self->pipe[1];
900 	self->need_thread = TRUE;
901 	break;
902 
903     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
904 	/* thread will connect for input, then read from socket and write to downstream */
905 	self->write_fdp = &neighboring_element_fd;
906 	self->need_thread = TRUE;
907 	break;
908 
909     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
910 	/* thread will connect for input, then read from socket and push downstream */
911 	self->need_thread = TRUE;
912 	break;
913 
914     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
915 	/* first pull will connect for input, then read from socket */
916 	self->on_pull = PULL_FROM_FD | PULL_CONNECT_FIRST;
917 	break;
918 
919     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
920 	/* thread will connect on both sides, then copy from socket to socket */
921 	self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
922 	self->need_thread = TRUE;
923 	break;
924 
925     default:
926 	g_assert_not_reached();
927 	break;
928     }
929 
930     /* set up ring if desired */
931     if (need_ring) {
932 	self->ring = g_try_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
933 	if (self->ring == NULL) {
934 	    xfer_cancel_with_error(elt, "Can't allocate memory for ring");
935 	    return FALSE;
936 	}
937 	self->ring_used_sem = amsemaphore_new_with_value(0);
938 	self->ring_free_sem = amsemaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
939     }
940 
941     if (need_listen_input) {
942 	if (!do_directtcp_listen(elt,
943 		    &self->input_listen_socket, &elt->input_listen_addrs))
944 	    return FALSE;
945     }
946     if (need_listen_output) {
947 	if (!do_directtcp_listen(elt,
948 		    &self->output_listen_socket, &elt->output_listen_addrs))
949 	    return FALSE;
950     }
951 
952     return TRUE;
953 }
954 
955 static gboolean
start_impl(XferElement * elt)956 start_impl(
957     XferElement *elt)
958 {
959     XferElementGlue *self = (XferElementGlue *)elt;
960 
961     if (self->need_thread)
962 	self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
963 
964     /* we're active if we have a thread that will eventually die */
965     return self->need_thread;
966 }
967 
968 static gpointer
pull_buffer_impl(XferElement * elt,size_t * size)969 pull_buffer_impl(
970     XferElement *elt,
971     size_t *size)
972 {
973     XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
974 
975     /* accept first, if required */
976     if (self->on_pull & PULL_ACCEPT_FIRST) {
977 	/* don't accept the next time around */
978 	self->on_pull &= ~PULL_ACCEPT_FIRST;
979 
980 	if (elt->cancelled) {
981 	    *size = 0;
982 	    return NULL;
983 	}
984 
985 	if ((self->input_data_socket = do_directtcp_accept(self,
986 					    &self->input_listen_socket)) == -1) {
987 	    /* do_directtcp_accept already signalled an error; xfer
988 	     * is cancelled */
989 	    *size = 0;
990 	    return NULL;
991 	}
992 
993 	/* read from this new socket */
994 	self->read_fdp = &self->input_data_socket;
995     }
996 
997     /* or connect first, if required */
998     if (self->on_pull & PULL_CONNECT_FIRST) {
999 	/* don't connect the next time around */
1000 	self->on_pull &= ~PULL_CONNECT_FIRST;
1001 
1002 	if (elt->cancelled) {
1003 	    *size = 0;
1004 	    return NULL;
1005 	}
1006 
1007 	if ((self->input_data_socket = do_directtcp_connect(self,
1008 				    elt->upstream->output_listen_addrs)) == -1) {
1009 	    /* do_directtcp_connect already signalled an error; xfer
1010 	     * is cancelled */
1011 	    *size = 0;
1012 	    return NULL;
1013 	}
1014 
1015 	/* read from this new socket */
1016 	self->read_fdp = &self->input_data_socket;
1017     }
1018 
1019     switch (self->on_pull) {
1020 	case PULL_FROM_RING_BUFFER: {
1021 	    gpointer buf;
1022 
1023 	    if (elt->cancelled) {
1024 		/* the finalize method will empty the ring buffer */
1025 		*size = 0;
1026 		return NULL;
1027 	    }
1028 
1029 	    /* make sure there's at least one element available */
1030 	    amsemaphore_down(self->ring_used_sem);
1031 
1032 	    /* get it */
1033 	    buf = self->ring[self->ring_tail].buf;
1034 	    *size = self->ring[self->ring_tail].size;
1035 	    self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
1036 
1037 	    /* and mark this element as free to be overwritten */
1038 	    amsemaphore_up(self->ring_free_sem);
1039 
1040 	    return buf;
1041 	}
1042 
1043 	case PULL_FROM_FD: {
1044 	    int fd = get_read_fd(self);
1045 	    char *buf;
1046 	    ssize_t len;
1047 
1048 	    /* if the fd is already closed, it's possible upstream bailed out
1049 	     * so quickly that we didn't even get a look at the fd */
1050 	    if (elt->cancelled || fd == -1) {
1051 		if (fd != -1) {
1052 		    if (elt->expect_eof)
1053 			xfer_element_drain_fd(fd);
1054 
1055 		    close_read_fd(self);
1056 		}
1057 
1058 		*size = 0;
1059 		return NULL;
1060 	    }
1061 
1062 	    buf = g_malloc(GLUE_BUFFER_SIZE);
1063 
1064 	    /* read from upstream */
1065 	    len = full_read(fd, buf, GLUE_BUFFER_SIZE);
1066 	    if (len < GLUE_BUFFER_SIZE) {
1067 		if (errno) {
1068 		    if (!elt->cancelled) {
1069 			xfer_cancel_with_error(elt,
1070 			    _("Error reading from fd %d: %s"), fd, strerror(errno));
1071 			wait_until_xfer_cancelled(elt->xfer);
1072 		    }
1073 
1074 		    /* return an EOF */
1075 		    amfree(buf);
1076 		    len = 0;
1077 
1078 		    /* and finish off the upstream */
1079 		    if (elt->expect_eof) {
1080 			xfer_element_drain_fd(fd);
1081 		    }
1082 		    close_read_fd(self);
1083 		} else if (len == 0) {
1084 		    /* EOF */
1085 		    g_free(buf);
1086 		    buf = NULL;
1087 		    *size = 0;
1088 
1089 		    /* signal EOF to downstream */
1090 		    close_read_fd(self);
1091 		}
1092 	    }
1093 
1094 	    *size = (size_t)len;
1095 
1096 	    return buf;
1097 	}
1098 
1099 	default:
1100 	case PULL_INVALID:
1101 	    g_assert_not_reached();
1102 	    return NULL;
1103     }
1104 }
1105 
1106 static void
push_buffer_impl(XferElement * elt,gpointer buf,size_t len)1107 push_buffer_impl(
1108     XferElement *elt,
1109     gpointer buf,
1110     size_t len)
1111 {
1112     XferElementGlue *self = (XferElementGlue *)elt;
1113 
1114     /* accept first, if required */
1115     if (self->on_push & PUSH_ACCEPT_FIRST) {
1116 	/* don't accept the next time around */
1117 	self->on_push &= ~PUSH_ACCEPT_FIRST;
1118 
1119 	if (elt->cancelled) {
1120 	    return;
1121 	}
1122 
1123 	if ((self->output_data_socket = do_directtcp_accept(self,
1124 					    &self->output_listen_socket)) == -1) {
1125 	    /* do_directtcp_accept already signalled an error; xfer
1126 	     * is cancelled */
1127 	    return;
1128 	}
1129 
1130 	/* write to this new socket */
1131 	self->write_fdp = &self->output_data_socket;
1132     }
1133 
1134     /* or connect first, if required */
1135     if (self->on_push & PUSH_CONNECT_FIRST) {
1136 	/* don't accept the next time around */
1137 	self->on_push &= ~PUSH_CONNECT_FIRST;
1138 
1139 	if (elt->cancelled) {
1140 	    return;
1141 	}
1142 
1143 	if ((self->output_data_socket = do_directtcp_connect(self,
1144 				    elt->downstream->input_listen_addrs)) == -1) {
1145 	    /* do_directtcp_connect already signalled an error; xfer
1146 	     * is cancelled */
1147 	    return;
1148 	}
1149 
1150 	/* read from this new socket */
1151 	self->write_fdp = &self->output_data_socket;
1152     }
1153 
1154     switch (self->on_push) {
1155 	case PUSH_TO_RING_BUFFER:
1156 	    /* just drop packets if the transfer has been cancelled */
1157 	    if (elt->cancelled) {
1158 		amfree(buf);
1159 		return;
1160 	    }
1161 
1162 	    /* make sure there's at least one element free */
1163 	    amsemaphore_down(self->ring_free_sem);
1164 
1165 	    /* set it */
1166 	    self->ring[self->ring_head].buf = buf;
1167 	    self->ring[self->ring_head].size = len;
1168 	    self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
1169 
1170 	    /* and mark this element as available for reading */
1171 	    amsemaphore_up(self->ring_used_sem);
1172 
1173 	    return;
1174 
1175 	case PUSH_TO_FD: {
1176 	    int fd = get_write_fd(self);
1177 
1178 	    /* if the fd is already closed, it's possible upstream bailed out
1179 	     * so quickly that we didn't even get a look at the fd.  In this
1180 	     * case we can assume the xfer has been cancelled and just discard
1181 	     * the data. */
1182 	    if (fd == -1)
1183 		return;
1184 
1185 	    if (elt->cancelled) {
1186 		if (!elt->expect_eof || !buf) {
1187 		    close_write_fd(self);
1188 
1189 		    /* hack to ensure we won't close the fd again, if we get another push */
1190 		    elt->expect_eof = TRUE;
1191 		}
1192 
1193 		amfree(buf);
1194 
1195 		return;
1196 	    }
1197 
1198 	    /* write the full buffer to the fd, or close on EOF */
1199 	    if (buf) {
1200 		if (!elt->downstream->drain_mode &&
1201 		    full_write(fd, buf, len) < len) {
1202 		    if (elt->downstream->must_drain) {
1203 			g_debug("Error writing to fd %d: %s",
1204 				fd, strerror(errno));
1205 		    } else if (elt->downstream->ignore_broken_pipe &&
1206 			       errno == EPIPE) {
1207 		    } else {
1208 			if (!elt->cancelled) {
1209 			    xfer_cancel_with_error(elt,
1210 				_("Error writing to fd %d: %s"),
1211 				fd, strerror(errno));
1212 			    wait_until_xfer_cancelled(elt->xfer);
1213 			}
1214 			/* nothing special to do to handle a cancellation */
1215 		    }
1216 		    elt->downstream->drain_mode = TRUE;
1217 		}
1218 		amfree(buf);
1219 	    } else {
1220 		close_write_fd(self);
1221 	    }
1222 
1223 	    return;
1224 	}
1225 
1226 	default:
1227 	case PUSH_INVALID:
1228 	    g_assert_not_reached();
1229 	    break;
1230     }
1231 }
1232 
1233 static void
instance_init(XferElementGlue * self)1234 instance_init(
1235     XferElementGlue *self)
1236 {
1237     XferElement *elt = (XferElement *)self;
1238     elt->can_generate_eof = TRUE;
1239     self->pipe[0] = self->pipe[1] = -1;
1240     self->input_listen_socket = -1;
1241     self->output_listen_socket = -1;
1242     self->input_data_socket = -1;
1243     self->output_data_socket = -1;
1244     self->read_fd = -1;
1245     self->write_fd = -1;
1246 }
1247 
1248 static void
finalize_impl(GObject * obj_self)1249 finalize_impl(
1250     GObject * obj_self)
1251 {
1252     XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
1253 
1254     /* first make sure the worker thread has finished up */
1255     if (self->thread)
1256 	g_thread_join(self->thread);
1257 
1258     /* close our pipes and fd's if they're still open */
1259     if (self->pipe[0] != -1) close(self->pipe[0]);
1260     if (self->pipe[1] != -1) close(self->pipe[1]);
1261     if (self->input_data_socket != -1) close(self->input_data_socket);
1262     if (self->output_data_socket != -1) close(self->output_data_socket);
1263     if (self->input_listen_socket != -1) close(self->input_listen_socket);
1264     if (self->output_listen_socket != -1) close(self->output_listen_socket);
1265     if (self->read_fd != -1) close(self->read_fd);
1266     if (self->write_fd != -1) close(self->write_fd);
1267 
1268     if (self->ring) {
1269 	/* empty the ring buffer, ignoring syncronization issues */
1270 	while (self->ring_used_sem->value) {
1271 	    if (self->ring[self->ring_tail].buf)
1272 		amfree(self->ring[self->ring_tail].buf);
1273 	    self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
1274 	}
1275 
1276 	amfree(self->ring);
1277 	amsemaphore_free(self->ring_used_sem);
1278 	amsemaphore_free(self->ring_free_sem);
1279     }
1280 
1281     /* chain up */
1282     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1283 }
1284 
1285 static xfer_element_mech_pair_t _pairs[] = {
1286     { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1287     { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1288     { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1289     { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1290     { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1291 
1292     { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */
1293     { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/
1294     { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */
1295     { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/
1296     { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1297 
1298     { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */
1299     { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1300     { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */
1301     { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1302     { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1303 
1304     { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */
1305     { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1306     { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */
1307     { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1308     { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1309 
1310     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1311     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1312     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1313     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1314     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1315 
1316     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1317     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1318     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1319     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1320     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy  */
1321 
1322     /* terminator */
1323     { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1324 };
1325 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
1326 
1327 static void
class_init(XferElementGlueClass * selfc)1328 class_init(
1329     XferElementGlueClass * selfc)
1330 {
1331     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1332     GObjectClass *goc = G_OBJECT_CLASS(selfc);
1333 
1334     klass->setup = setup_impl;
1335     klass->start = start_impl;
1336     klass->push_buffer = push_buffer_impl;
1337     klass->pull_buffer = pull_buffer_impl;
1338 
1339     klass->perl_class = "Amanda::Xfer::Element::Glue";
1340     klass->mech_pairs = xfer_element_glue_mech_pairs;
1341 
1342     goc->finalize = finalize_impl;
1343 
1344     parent_class = g_type_class_peek_parent(selfc);
1345 }
1346 
1347 GType
xfer_element_glue_get_type(void)1348 xfer_element_glue_get_type (void)
1349 {
1350     static GType type = 0;
1351 
1352     if G_UNLIKELY(type == 0) {
1353         static const GTypeInfo info = {
1354             sizeof (XferElementGlueClass),
1355             (GBaseInitFunc) NULL,
1356             (GBaseFinalizeFunc) NULL,
1357             (GClassInitFunc) class_init,
1358             (GClassFinalizeFunc) NULL,
1359             NULL /* class_data */,
1360             sizeof (XferElementGlue),
1361             0 /* n_preallocs */,
1362             (GInstanceInitFunc) instance_init,
1363             NULL
1364         };
1365 
1366         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
1367     }
1368 
1369     return type;
1370 }
1371 
1372 /* create an element of this class; prototype is in xfer-element.h */
1373 XferElement *
xfer_element_glue(void)1374 xfer_element_glue(void)
1375 {
1376     XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
1377     XferElement *elt = XFER_ELEMENT(self);
1378 
1379     return elt;
1380 }
1381