1 /*
2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2008-2013 Zmanda, Inc. All Rights Reserved.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 *
19 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21 */
22
23 #include "amanda.h"
24 #include "amxfer.h"
25 #include "element-glue.h"
26 #include "directtcp.h"
27 #include "util.h"
28 #include "sockaddr-util.h"
29 #include "stream.h"
30 #include "debug.h"
31
32 /*
33 * Instance definition
34 */
35
36 typedef struct XferElementGlue_ {
37 XferElement __parent__;
38
39 /* instructions to push_buffer_impl */
40 enum {
41 PUSH_TO_RING_BUFFER,
42 PUSH_TO_FD, /* write to write_fd */
43 PUSH_INVALID,
44
45 PUSH_ACCEPT_FIRST = (1 << 16),
46 PUSH_CONNECT_FIRST = (2 << 16),
47 } on_push;
48
49 /* instructions to pull_buffer_impl */
50 enum {
51 PULL_FROM_RING_BUFFER,
52 PULL_FROM_FD, /* read from read_fd */
53 PULL_INVALID,
54
55 PULL_ACCEPT_FIRST = (1 << 16),
56 PULL_CONNECT_FIRST = (2 << 16),
57 } on_pull;
58
59 int *write_fdp;
60 int *read_fdp;
61
62 gboolean need_thread;
63
64 /* the stuff we might use, depending on what flavor of glue we're
65 * providing.. */
66 int pipe[2];
67 int input_listen_socket, output_listen_socket;
68 int input_data_socket, output_data_socket;
69 int read_fd, write_fd;
70
71 /* a ring buffer of ptr/size pairs with semaphores */
72 struct { gpointer buf; size_t size; } *ring;
73 amsemaphore_t *ring_used_sem, *ring_free_sem;
74 gint ring_head, ring_tail;
75
76 GThread *thread;
77 GThreadFunc threadfunc;
78 } XferElementGlue;
79
80 /*
81 * Class definition
82 */
83
84 typedef struct XferElementGlueClass_ {
85 XferElementClass __parent__;
86 } XferElementGlueClass;
87
88 static GObjectClass *parent_class = NULL;
89
90 /*
91 * Utility functions, etc.
92 */
93
94 static void
make_pipe(XferElementGlue * self)95 make_pipe(
96 XferElementGlue *self)
97 {
98 if (pipe(self->pipe) < 0)
99 g_critical(_("Could not create pipe: %s"), strerror(errno));
100 }
101
102 static void
send_xfer_done(XferElementGlue * self)103 send_xfer_done(
104 XferElementGlue *self)
105 {
106 xfer_queue_message(XFER_ELEMENT(self)->xfer,
107 xmsg_new((XferElement *)self, XMSG_DONE, 0));
108 }
109
110 static gboolean
do_directtcp_listen(XferElement * elt,int * sockp,DirectTCPAddr ** addrsp)111 do_directtcp_listen(
112 XferElement *elt,
113 int *sockp,
114 DirectTCPAddr **addrsp)
115 {
116 int sock;
117 sockaddr_union data_addr;
118 DirectTCPAddr *addrs;
119 socklen_t len;
120 struct addrinfo *res;
121 struct addrinfo *res_addr;
122 sockaddr_union *addr = NULL;
123
124 if (resolve_hostname("localhost", 0, &res, NULL) != 0) {
125 xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno));
126 return FALSE;
127 }
128 for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
129 if (res_addr->ai_family == AF_INET) {
130 addr = (sockaddr_union *)res_addr->ai_addr;
131 break;
132 }
133 }
134 if (!addr) {
135 addr = (sockaddr_union *)res->ai_addr;
136 }
137
138 sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
139 if (sock < 0) {
140 xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
141 return FALSE;
142 }
143
144 len = SS_LEN(addr);
145 if (bind(sock, (struct sockaddr *)addr, len) != 0) {
146 xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
147 freeaddrinfo(res);
148 return FALSE;
149 }
150
151 if (listen(sock, 1) < 0) {
152 xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
153 return FALSE;
154 }
155
156 /* TODO: which addresses should this display? all ifaces? localhost? */
157 len = sizeof(data_addr);
158 if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
159 error("getsockname(): %s", strerror(errno));
160
161 addrs = g_new0(DirectTCPAddr, 2);
162 copy_sockaddr(&addrs[0], &data_addr);
163 *addrsp = addrs;
164
165 return TRUE;
166 }
167
168 static gboolean
prolong_accept(gpointer data)169 prolong_accept(
170 gpointer data)
171 {
172 return !XFER_ELEMENT(data)->cancelled;
173 }
174
175 static int
do_directtcp_accept(XferElementGlue * self,int * socketp)176 do_directtcp_accept(
177 XferElementGlue *self,
178 int *socketp)
179 {
180 int sock;
181 time_t timeout_time = time(NULL) + 60;
182 g_assert(*socketp != -1);
183
184 if ((sock = interruptible_accept(*socketp, NULL, NULL,
185 prolong_accept, self, timeout_time)) == -1) {
186 /* if the accept was interrupted due to a cancellation, then do not
187 * add a further error message */
188 if (errno == 0 && XFER_ELEMENT(self)->cancelled)
189 return -1;
190
191 xfer_cancel_with_error(XFER_ELEMENT(self),
192 _("Error accepting incoming connection: %s"), strerror(errno));
193 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
194 return -1;
195 }
196
197 /* close the listening socket now, for good measure */
198 close(*socketp);
199 *socketp = -1;
200
201 g_debug("do_directtcp_accept: %d", sock);
202
203 return sock;
204 }
205
206 static int
do_directtcp_connect(XferElementGlue * self,DirectTCPAddr * addrs)207 do_directtcp_connect(
208 XferElementGlue *self,
209 DirectTCPAddr *addrs)
210 {
211 XferElement *elt = XFER_ELEMENT(self);
212 sockaddr_union addr;
213 int sock;
214 #ifdef WORKING_IPV6
215 char strsockaddr[INET6_ADDRSTRLEN + 20];
216 #else
217 char strsockaddr[INET_ADDRSTRLEN + 20];
218 #endif
219
220 if (!addrs) {
221 g_debug("element-glue got no directtcp addresses to connect to!");
222 if (!elt->cancelled) {
223 xfer_cancel_with_error(elt,
224 "%s got no directtcp addresses to connect to",
225 xfer_element_repr(elt));
226 }
227 goto cancel_wait;
228 }
229
230 /* set up the sockaddr -- IPv4 only */
231 copy_sockaddr(&addr, addrs);
232
233 str_sockaddr_r(&addr, strsockaddr, sizeof(strsockaddr));
234
235 if (strncmp(strsockaddr,"255.255.255.255:", 16) == 0) {
236 char buffer[32770];
237 char *s;
238 int size;
239 char *data_host;
240 int data_port;
241
242 g_debug("do_directtcp_connect making indirect data connection to %s",
243 strsockaddr);
244 data_port = SU_GET_PORT(&addr);
245 sock = stream_client("localhost", data_port,
246 STREAM_BUFSIZE, 0, NULL, 0);
247 if (sock < 0) {
248 xfer_cancel_with_error(elt, "stream_client(): %s", strerror(errno));
249 goto cancel_wait;
250 }
251 size = full_read(sock, buffer, 32768);
252 if (size < 0 ) {
253 xfer_cancel_with_error(elt, "failed to read from indirecttcp: %s",
254 strerror(errno));
255 goto cancel_wait;
256 }
257 close(sock);
258 buffer[size++] = ' ';
259 buffer[size] = '\0';
260 if ((s = strchr(buffer, ':')) == NULL) {
261 xfer_cancel_with_error(elt,
262 "Failed to parse indirect data stream: %s",
263 buffer);
264 goto cancel_wait;
265 }
266 *s++ = '\0';
267 data_host = buffer;
268 data_port = atoi(s);
269
270 str_to_sockaddr(data_host, &addr);
271 SU_SET_PORT(&addr, data_port);
272
273 str_sockaddr_r(&addr, strsockaddr, sizeof(strsockaddr));
274 }
275
276 sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
277
278 g_debug("do_directtcp_connect making data connection to %s", strsockaddr);
279
280 if (sock < 0) {
281 xfer_cancel_with_error(elt,
282 "socket(): %s", strerror(errno));
283 goto cancel_wait;
284 }
285 if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
286 xfer_cancel_with_error(elt,
287 "connect(): %s", strerror(errno));
288 goto cancel_wait;
289 }
290
291 g_debug("do_directtcp_connect: connected to %s, fd %d", strsockaddr, sock);
292
293 return sock;
294
295 cancel_wait:
296 wait_until_xfer_cancelled(elt->xfer);
297 return -1;
298 }
299
300 #define GLUE_BUFFER_SIZE 32768
301 #define GLUE_RING_BUFFER_SIZE 32
302
303 #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
304
305 /*
306 * fd handling
307 */
308
309 /* if self->read_fdp or self->write_fdp are pointing to this integer, then they
310 * should be redirected to point to the upstream's output_fd or downstream's
311 * input_fd, respectively, at the first call to get_read_fd or get_write_fd,
312 * respectively. */
313 static int neighboring_element_fd = -1;
314
315 #define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
316 static int
_get_read_fd(XferElementGlue * self)317 _get_read_fd(XferElementGlue *self)
318 {
319 if (!self->read_fdp)
320 return -1; /* shouldn't happen.. */
321
322 if (self->read_fdp == &neighboring_element_fd) {
323 XferElement *elt = XFER_ELEMENT(self);
324 self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
325 } else {
326 self->read_fd = *self->read_fdp;
327 *self->read_fdp = -1;
328 }
329 self->read_fdp = NULL;
330 return self->read_fd;
331 }
332
333 #define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
334 static int
_get_write_fd(XferElementGlue * self)335 _get_write_fd(XferElementGlue *self)
336 {
337 if (!self->write_fdp)
338 return -1; /* shouldn't happen.. */
339
340 if (self->write_fdp == &neighboring_element_fd) {
341 XferElement *elt = XFER_ELEMENT(self);
342 self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
343 } else {
344 self->write_fd = *self->write_fdp;
345 *self->write_fdp = -1;
346 }
347 self->write_fdp = NULL;
348 return self->write_fd;
349 }
350
351 static int
close_read_fd(XferElementGlue * self)352 close_read_fd(XferElementGlue *self)
353 {
354 int fd = get_read_fd(self);
355 self->read_fd = -1;
356 return close(fd);
357 }
358
359 static int
close_write_fd(XferElementGlue * self)360 close_write_fd(XferElementGlue *self)
361 {
362 int fd = get_write_fd(self);
363 self->write_fd = -1;
364 return close(fd);
365 }
366
367 /*
368 * Worker thread utility functions
369 */
370
371 static void
pull_and_write(XferElementGlue * self)372 pull_and_write(XferElementGlue *self)
373 {
374 XferElement *elt = XFER_ELEMENT(self);
375 int fd = get_write_fd(self);
376 self->write_fdp = NULL;
377 crc32_init(&elt->crc);
378
379 while (!elt->cancelled) {
380 size_t len;
381 char *buf;
382
383 /* get a buffer from upstream */
384 buf = xfer_element_pull_buffer(elt->upstream, &len);
385 if (!buf)
386 break;
387
388 /* write it */
389 if (!elt->downstream->drain_mode && full_write(fd, buf, len) < len) {
390 if (elt->downstream->must_drain) {
391 g_debug("Error writing to fd %d: %s", fd, strerror(errno));
392 } else if (elt->downstream->ignore_broken_pipe && errno == EPIPE) {
393 } else {
394 if (!elt->cancelled) {
395 xfer_cancel_with_error(elt,
396 _("Error writing to fd %d: %s"), fd, strerror(errno));
397 xfer_cancel(elt->xfer);
398 wait_until_xfer_cancelled(elt->xfer);
399 }
400 amfree(buf);
401 break;
402 }
403 elt->downstream->drain_mode = TRUE;
404 }
405 crc32_add((uint8_t *)buf, len, &elt->crc);
406
407 amfree(buf);
408 }
409
410 if (elt->cancelled && elt->expect_eof)
411 xfer_element_drain_buffers(elt->upstream);
412
413 g_debug("xfer-dest-fd CRC: %08x:%lld",
414 crc32_finish(&elt->crc), (long long)elt->crc.size);
415
416 /* close the fd we've been writing, as an EOF signal to downstream, and
417 * set it to -1 to avoid accidental re-use */
418 close_write_fd(self);
419 }
420
421 static void
read_and_write(XferElementGlue * self)422 read_and_write(XferElementGlue *self)
423 {
424 XferElement *elt = XFER_ELEMENT(self);
425 /* dynamically allocate a buffer, in case this thread has
426 * a limited amount of stack allocated */
427 char *buf = g_malloc(GLUE_BUFFER_SIZE);
428 int rfd = get_read_fd(self);
429 int wfd = get_write_fd(self);
430
431 g_debug("read_and_write: read from %d, write to %d", rfd, wfd);
432 while (!elt->cancelled) {
433 size_t len;
434
435 /* read from upstream */
436 len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
437 if (len < GLUE_BUFFER_SIZE) {
438 if (errno) {
439 if (!elt->cancelled) {
440 xfer_cancel_with_error(elt,
441 _("Error reading from fd %d: %s"), rfd, strerror(errno));
442 wait_until_xfer_cancelled(elt->xfer);
443 }
444 break;
445 } else if (len == 0) { /* we only count a zero-length read as EOF */
446 break;
447 }
448 }
449
450 /* write the buffer fully */
451 if (!elt->downstream->drain_mode && full_write(wfd, buf, len) < len) {
452 if (elt->downstream->must_drain) {
453 g_debug("Could not write to fd %d: %s", wfd, strerror(errno));
454 } else if (elt->downstream->ignore_broken_pipe && errno == EPIPE) {
455 } else {
456 if (!elt->cancelled) {
457 xfer_cancel_with_error(elt,
458 _("Could not write to fd %d: %s"),
459 wfd, strerror(errno));
460 wait_until_xfer_cancelled(elt->xfer);
461 }
462 break;
463 }
464 }
465 }
466
467 if (elt->cancelled && elt->expect_eof)
468 xfer_element_drain_fd(rfd);
469
470 /* close the read fd. If it's not at EOF, then upstream will get EPIPE, which will hopefully
471 * kill it and complete the cancellation */
472 close_read_fd(self);
473
474 /* close the fd we've been writing, as an EOF signal to downstream */
475 close_write_fd(self);
476
477 amfree(buf);
478 }
479
480 static void
read_and_push(XferElementGlue * self)481 read_and_push(
482 XferElementGlue *self)
483 {
484 XferElement *elt = XFER_ELEMENT(self);
485 int fd = get_read_fd(self);
486
487 crc32_init(&elt->crc);
488 while (!elt->cancelled) {
489 char *buf = g_malloc(GLUE_BUFFER_SIZE);
490 size_t len;
491
492 /* read a buffer from upstream */
493 len = full_read(fd, buf, GLUE_BUFFER_SIZE);
494 if (len < GLUE_BUFFER_SIZE) {
495 if (errno) {
496 if (!elt->cancelled) {
497 int saved_errno = errno;
498 xfer_cancel_with_error(elt,
499 _("Error reading from fd %d: %s"), fd, strerror(saved_errno));
500 g_debug("element-glue: error reading from fd %d: %s",
501 fd, strerror(saved_errno));
502 wait_until_xfer_cancelled(elt->xfer);
503 }
504 amfree(buf);
505 break;
506 } else if (len == 0) { /* we only count a zero-length read as EOF */
507 amfree(buf);
508 break;
509 }
510 }
511
512 crc32_add((uint8_t *)buf, len, &elt->crc);
513 xfer_element_push_buffer(elt->downstream, buf, len);
514 }
515
516 if (elt->cancelled && elt->expect_eof)
517 xfer_element_drain_fd(fd);
518
519 /* send an EOF indication downstream */
520 xfer_element_push_buffer(elt->downstream, NULL, 0);
521
522 /* close the read fd, since it's at EOF */
523 close_read_fd(self);
524
525 g_debug("xfer-source-fd CRC: %08x:%lld",
526 crc32_finish(&elt->crc), (long long)elt->crc.size);
527 }
528
529 static void
pull_and_push(XferElementGlue * self)530 pull_and_push(XferElementGlue *self)
531 {
532 XferElement *elt = XFER_ELEMENT(self);
533 gboolean eof_sent = FALSE;
534
535 while (!elt->cancelled) {
536 char *buf;
537 size_t len;
538
539 /* get a buffer from upstream */
540 buf = xfer_element_pull_buffer(elt->upstream, &len);
541
542 /* and push it downstream */
543 xfer_element_push_buffer(elt->downstream, buf, len);
544
545 if (!buf) {
546 eof_sent = TRUE;
547 break;
548 }
549 }
550
551 if (elt->cancelled && elt->expect_eof)
552 xfer_element_drain_buffers(elt->upstream);
553
554 if (!eof_sent)
555 xfer_element_push_buffer(elt->downstream, NULL, 0);
556 }
557
558 static gpointer
worker_thread(gpointer data)559 worker_thread(
560 gpointer data)
561 {
562 XferElement *elt = XFER_ELEMENT(data);
563 XferElementGlue *self = XFER_ELEMENT_GLUE(data);
564
565 switch (mech_pair(elt->input_mech, elt->output_mech)) {
566 case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
567 read_and_write(self);
568 break;
569
570 case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
571 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
572 read_and_push(self);
573 break;
574
575 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
576 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
577 pull_and_write(self);
578 break;
579
580 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
581 pull_and_push(self);
582 break;
583
584 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
585 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
586 if ((self->output_data_socket = do_directtcp_connect(self,
587 elt->downstream->input_listen_addrs)) == -1)
588 break;
589 self->write_fdp = &self->output_data_socket;
590 read_and_write(self);
591 break;
592
593 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
594 if ((self->output_data_socket = do_directtcp_connect(self,
595 elt->downstream->input_listen_addrs)) == -1)
596 break;
597 self->write_fdp = &self->output_data_socket;
598 pull_and_write(self);
599 break;
600
601 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
602 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
603 if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
604 break;
605 self->read_fdp = &self->input_data_socket;
606 read_and_write(self);
607 break;
608
609 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
610 if ((self->input_data_socket = do_directtcp_accept(self,
611 &self->input_listen_socket)) == -1)
612 break;
613 self->read_fdp = &self->input_data_socket;
614 read_and_push(self);
615 break;
616
617 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
618 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
619 case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
620 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
621 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
622 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
623 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
624 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
625 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
626 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
627 default:
628 g_assert_not_reached();
629 break;
630
631 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
632 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
633 if ((self->output_data_socket = do_directtcp_accept(self,
634 &self->output_listen_socket)) == -1)
635 break;
636 self->write_fdp = &self->output_data_socket;
637 read_and_write(self);
638 break;
639
640 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
641 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
642 if ((self->input_data_socket = do_directtcp_connect(self,
643 elt->upstream->output_listen_addrs)) == -1)
644 break;
645 self->read_fdp = &self->input_data_socket;
646 read_and_write(self);
647 break;
648
649 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
650 if ((self->input_data_socket = do_directtcp_connect(self,
651 elt->upstream->output_listen_addrs)) == -1)
652 break;
653 self->read_fdp = &self->input_data_socket;
654 read_and_push(self);
655 break;
656
657 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
658 if ((self->output_data_socket = do_directtcp_accept(self,
659 &self->output_listen_socket)) == -1)
660 break;
661 self->write_fdp = &self->output_data_socket;
662 pull_and_write(self);
663 break;
664
665 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
666 /* TODO: use async accept's here to avoid order dependency */
667 if ((self->output_data_socket = do_directtcp_accept(self,
668 &self->output_listen_socket)) == -1)
669 break;
670 self->write_fdp = &self->output_data_socket;
671 if ((self->input_data_socket = do_directtcp_accept(self,
672 &self->input_listen_socket)) == -1)
673 break;
674 self->read_fdp = &self->input_data_socket;
675 read_and_write(self);
676 break;
677
678 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
679 /* TODO: use async connects and select() to avoid order dependency here */
680 if ((self->input_data_socket = do_directtcp_connect(self,
681 elt->upstream->output_listen_addrs)) == -1)
682 break;
683 self->read_fdp = &self->input_data_socket;
684 if ((self->output_data_socket = do_directtcp_connect(self,
685 elt->downstream->input_listen_addrs)) == -1)
686 break;
687 self->write_fdp = &self->output_data_socket;
688 read_and_write(self);
689 break;
690 }
691
692 send_xfer_done(self);
693
694 return NULL;
695 }
696
697 /*
698 * Implementation
699 */
700
701 static gboolean
setup_impl(XferElement * elt)702 setup_impl(
703 XferElement *elt)
704 {
705 XferElementGlue *self = (XferElementGlue *)elt;
706 gboolean need_ring = FALSE;
707 gboolean need_listen_input = FALSE;
708 gboolean need_listen_output = FALSE;
709
710 g_assert(elt->input_mech != XFER_MECH_NONE);
711 g_assert(elt->output_mech != XFER_MECH_NONE);
712 g_assert(elt->input_mech != elt->output_mech);
713
714 self->read_fdp = NULL;
715 self->write_fdp = NULL;
716 self->on_push = PUSH_INVALID;
717 self->on_pull = PULL_INVALID;
718 self->need_thread = FALSE;
719
720 switch (mech_pair(elt->input_mech, elt->output_mech)) {
721 case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
722 /* thread will read from one fd and write to the other */
723 self->read_fdp = &neighboring_element_fd;
724 self->write_fdp = &neighboring_element_fd;
725 self->need_thread = TRUE;
726 break;
727
728 case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
729 /* thread will read from one fd and call push_buffer downstream */
730 self->read_fdp = &neighboring_element_fd;
731 self->need_thread = TRUE;
732 break;
733
734 case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
735 self->read_fdp = &neighboring_element_fd;
736 self->on_pull = PULL_FROM_FD;
737 break;
738
739 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
740 /* thread will connect for output, then read from fd and write to the
741 * socket. */
742 self->read_fdp = &neighboring_element_fd;
743 self->need_thread = TRUE;
744 break;
745
746 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
747 /* thread will accept output conn, then read from upstream and write to socket */
748 self->read_fdp = &neighboring_element_fd;
749 self->need_thread = TRUE;
750 need_listen_output = TRUE;
751 break;
752
753 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
754 make_pipe(self);
755 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
756 self->pipe[1] = -1; /* upstream will close this for us */
757 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
758 self->pipe[0] = -1; /* downstream will close this for us */
759 break;
760
761 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
762 /* thread will read from pipe and call downstream's push_buffer */
763 make_pipe(self);
764 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
765 self->pipe[1] = -1; /* upstream will close this for us */
766 self->read_fdp = &self->pipe[0];
767 self->need_thread = TRUE;
768 break;
769
770 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
771 make_pipe(self);
772 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
773 self->pipe[1] = -1; /* upstream will close this for us */
774 self->on_pull = PULL_FROM_FD;
775 self->read_fdp = &self->pipe[0];
776 break;
777
778 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
779 /* thread will connect for output, then read from pipe and write to socket */
780 make_pipe(self);
781 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
782 self->pipe[1] = -1; /* upstream will close this for us */
783 self->read_fdp = &self->pipe[0];
784 self->need_thread = TRUE;
785 break;
786
787 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
788 /* thread will accept output conn, then read from pipe and write to socket */
789 make_pipe(self);
790 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
791 self->pipe[1] = -1; /* upstream will close this for us */
792 self->read_fdp = &self->pipe[0];
793 self->need_thread = TRUE;
794 need_listen_output = TRUE;
795 break;
796
797 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
798 make_pipe(self);
799 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
800 self->pipe[0] = -1; /* downstream will close this for us */
801 self->on_push = PUSH_TO_FD;
802 self->write_fdp = &self->pipe[1];
803 break;
804
805 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
806 self->on_push = PUSH_TO_FD;
807 self->write_fdp = &neighboring_element_fd;
808 break;
809
810 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
811 self->on_push = PUSH_TO_RING_BUFFER;
812 self->on_pull = PULL_FROM_RING_BUFFER;
813 need_ring = TRUE;
814 break;
815
816 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
817 /* push will connect for output first */
818 self->on_push = PUSH_TO_FD | PUSH_CONNECT_FIRST;
819 break;
820
821 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
822 /* push will accept for output first */
823 self->on_push = PUSH_TO_FD | PUSH_ACCEPT_FIRST;
824 need_listen_output = TRUE;
825 break;
826
827 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
828 /* thread will pull from upstream and write to pipe */
829 make_pipe(self);
830 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
831 self->pipe[0] = -1; /* downstream will close this for us */
832 self->write_fdp = &self->pipe[1];
833 self->need_thread = TRUE;
834 break;
835
836 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
837 /* thread will pull from upstream and write to downstream */
838 self->write_fdp = &neighboring_element_fd;
839 self->need_thread = TRUE;
840 break;
841
842 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
843 /* thread will pull from upstream and push to downstream */
844 self->need_thread = TRUE;
845 break;
846
847 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
848 /* thread will connect for output, then pull from upstream and write to socket */
849 self->need_thread = TRUE;
850 break;
851
852 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
853 /* thread will accept for output, then pull from upstream and write to socket */
854 self->need_thread = TRUE;
855 need_listen_output = TRUE;
856 break;
857
858 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
859 /* thread will accept for input, then read from socket and write to pipe */
860 make_pipe(self);
861 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
862 self->pipe[0] = -1; /* downstream will close this for us */
863 self->write_fdp = &self->pipe[1];
864 self->need_thread = TRUE;
865 need_listen_input = TRUE;
866 break;
867
868 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
869 /* thread will accept for input, then read from socket and write to downstream */
870 self->write_fdp = &neighboring_element_fd;
871 self->need_thread = TRUE;
872 need_listen_input = TRUE;
873 break;
874
875 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
876 /* thread will accept for input, then read from socket and push downstream */
877 self->need_thread = TRUE;
878 need_listen_input = TRUE;
879 break;
880
881 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
882 /* first pull will accept for input, then read from socket */
883 self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
884 need_listen_input = TRUE;
885 break;
886
887 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
888 /* thread will accept on both sides, then copy from socket to socket */
889 self->need_thread = TRUE;
890 need_listen_input = TRUE;
891 need_listen_output = TRUE;
892 break;
893
894 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
895 /* thread will connect for input, then read from socket and write to pipe */
896 make_pipe(self);
897 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
898 self->pipe[0] = -1; /* downstream will close this for us */
899 self->write_fdp = &self->pipe[1];
900 self->need_thread = TRUE;
901 break;
902
903 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
904 /* thread will connect for input, then read from socket and write to downstream */
905 self->write_fdp = &neighboring_element_fd;
906 self->need_thread = TRUE;
907 break;
908
909 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
910 /* thread will connect for input, then read from socket and push downstream */
911 self->need_thread = TRUE;
912 break;
913
914 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
915 /* first pull will connect for input, then read from socket */
916 self->on_pull = PULL_FROM_FD | PULL_CONNECT_FIRST;
917 break;
918
919 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
920 /* thread will connect on both sides, then copy from socket to socket */
921 self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
922 self->need_thread = TRUE;
923 break;
924
925 default:
926 g_assert_not_reached();
927 break;
928 }
929
930 /* set up ring if desired */
931 if (need_ring) {
932 self->ring = g_try_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
933 if (self->ring == NULL) {
934 xfer_cancel_with_error(elt, "Can't allocate memory for ring");
935 return FALSE;
936 }
937 self->ring_used_sem = amsemaphore_new_with_value(0);
938 self->ring_free_sem = amsemaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
939 }
940
941 if (need_listen_input) {
942 if (!do_directtcp_listen(elt,
943 &self->input_listen_socket, &elt->input_listen_addrs))
944 return FALSE;
945 }
946 if (need_listen_output) {
947 if (!do_directtcp_listen(elt,
948 &self->output_listen_socket, &elt->output_listen_addrs))
949 return FALSE;
950 }
951
952 return TRUE;
953 }
954
955 static gboolean
start_impl(XferElement * elt)956 start_impl(
957 XferElement *elt)
958 {
959 XferElementGlue *self = (XferElementGlue *)elt;
960
961 if (self->need_thread)
962 self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
963
964 /* we're active if we have a thread that will eventually die */
965 return self->need_thread;
966 }
967
968 static gpointer
pull_buffer_impl(XferElement * elt,size_t * size)969 pull_buffer_impl(
970 XferElement *elt,
971 size_t *size)
972 {
973 XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
974
975 /* accept first, if required */
976 if (self->on_pull & PULL_ACCEPT_FIRST) {
977 /* don't accept the next time around */
978 self->on_pull &= ~PULL_ACCEPT_FIRST;
979
980 if (elt->cancelled) {
981 *size = 0;
982 return NULL;
983 }
984
985 if ((self->input_data_socket = do_directtcp_accept(self,
986 &self->input_listen_socket)) == -1) {
987 /* do_directtcp_accept already signalled an error; xfer
988 * is cancelled */
989 *size = 0;
990 return NULL;
991 }
992
993 /* read from this new socket */
994 self->read_fdp = &self->input_data_socket;
995 }
996
997 /* or connect first, if required */
998 if (self->on_pull & PULL_CONNECT_FIRST) {
999 /* don't connect the next time around */
1000 self->on_pull &= ~PULL_CONNECT_FIRST;
1001
1002 if (elt->cancelled) {
1003 *size = 0;
1004 return NULL;
1005 }
1006
1007 if ((self->input_data_socket = do_directtcp_connect(self,
1008 elt->upstream->output_listen_addrs)) == -1) {
1009 /* do_directtcp_connect already signalled an error; xfer
1010 * is cancelled */
1011 *size = 0;
1012 return NULL;
1013 }
1014
1015 /* read from this new socket */
1016 self->read_fdp = &self->input_data_socket;
1017 }
1018
1019 switch (self->on_pull) {
1020 case PULL_FROM_RING_BUFFER: {
1021 gpointer buf;
1022
1023 if (elt->cancelled) {
1024 /* the finalize method will empty the ring buffer */
1025 *size = 0;
1026 return NULL;
1027 }
1028
1029 /* make sure there's at least one element available */
1030 amsemaphore_down(self->ring_used_sem);
1031
1032 /* get it */
1033 buf = self->ring[self->ring_tail].buf;
1034 *size = self->ring[self->ring_tail].size;
1035 self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
1036
1037 /* and mark this element as free to be overwritten */
1038 amsemaphore_up(self->ring_free_sem);
1039
1040 return buf;
1041 }
1042
1043 case PULL_FROM_FD: {
1044 int fd = get_read_fd(self);
1045 char *buf;
1046 ssize_t len;
1047
1048 /* if the fd is already closed, it's possible upstream bailed out
1049 * so quickly that we didn't even get a look at the fd */
1050 if (elt->cancelled || fd == -1) {
1051 if (fd != -1) {
1052 if (elt->expect_eof)
1053 xfer_element_drain_fd(fd);
1054
1055 close_read_fd(self);
1056 }
1057
1058 *size = 0;
1059 return NULL;
1060 }
1061
1062 buf = g_malloc(GLUE_BUFFER_SIZE);
1063
1064 /* read from upstream */
1065 len = full_read(fd, buf, GLUE_BUFFER_SIZE);
1066 if (len < GLUE_BUFFER_SIZE) {
1067 if (errno) {
1068 if (!elt->cancelled) {
1069 xfer_cancel_with_error(elt,
1070 _("Error reading from fd %d: %s"), fd, strerror(errno));
1071 wait_until_xfer_cancelled(elt->xfer);
1072 }
1073
1074 /* return an EOF */
1075 amfree(buf);
1076 len = 0;
1077
1078 /* and finish off the upstream */
1079 if (elt->expect_eof) {
1080 xfer_element_drain_fd(fd);
1081 }
1082 close_read_fd(self);
1083 } else if (len == 0) {
1084 /* EOF */
1085 g_free(buf);
1086 buf = NULL;
1087 *size = 0;
1088
1089 /* signal EOF to downstream */
1090 close_read_fd(self);
1091 }
1092 }
1093
1094 *size = (size_t)len;
1095
1096 return buf;
1097 }
1098
1099 default:
1100 case PULL_INVALID:
1101 g_assert_not_reached();
1102 return NULL;
1103 }
1104 }
1105
1106 static void
push_buffer_impl(XferElement * elt,gpointer buf,size_t len)1107 push_buffer_impl(
1108 XferElement *elt,
1109 gpointer buf,
1110 size_t len)
1111 {
1112 XferElementGlue *self = (XferElementGlue *)elt;
1113
1114 /* accept first, if required */
1115 if (self->on_push & PUSH_ACCEPT_FIRST) {
1116 /* don't accept the next time around */
1117 self->on_push &= ~PUSH_ACCEPT_FIRST;
1118
1119 if (elt->cancelled) {
1120 return;
1121 }
1122
1123 if ((self->output_data_socket = do_directtcp_accept(self,
1124 &self->output_listen_socket)) == -1) {
1125 /* do_directtcp_accept already signalled an error; xfer
1126 * is cancelled */
1127 return;
1128 }
1129
1130 /* write to this new socket */
1131 self->write_fdp = &self->output_data_socket;
1132 }
1133
1134 /* or connect first, if required */
1135 if (self->on_push & PUSH_CONNECT_FIRST) {
1136 /* don't accept the next time around */
1137 self->on_push &= ~PUSH_CONNECT_FIRST;
1138
1139 if (elt->cancelled) {
1140 return;
1141 }
1142
1143 if ((self->output_data_socket = do_directtcp_connect(self,
1144 elt->downstream->input_listen_addrs)) == -1) {
1145 /* do_directtcp_connect already signalled an error; xfer
1146 * is cancelled */
1147 return;
1148 }
1149
1150 /* read from this new socket */
1151 self->write_fdp = &self->output_data_socket;
1152 }
1153
1154 switch (self->on_push) {
1155 case PUSH_TO_RING_BUFFER:
1156 /* just drop packets if the transfer has been cancelled */
1157 if (elt->cancelled) {
1158 amfree(buf);
1159 return;
1160 }
1161
1162 /* make sure there's at least one element free */
1163 amsemaphore_down(self->ring_free_sem);
1164
1165 /* set it */
1166 self->ring[self->ring_head].buf = buf;
1167 self->ring[self->ring_head].size = len;
1168 self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
1169
1170 /* and mark this element as available for reading */
1171 amsemaphore_up(self->ring_used_sem);
1172
1173 return;
1174
1175 case PUSH_TO_FD: {
1176 int fd = get_write_fd(self);
1177
1178 /* if the fd is already closed, it's possible upstream bailed out
1179 * so quickly that we didn't even get a look at the fd. In this
1180 * case we can assume the xfer has been cancelled and just discard
1181 * the data. */
1182 if (fd == -1)
1183 return;
1184
1185 if (elt->cancelled) {
1186 if (!elt->expect_eof || !buf) {
1187 close_write_fd(self);
1188
1189 /* hack to ensure we won't close the fd again, if we get another push */
1190 elt->expect_eof = TRUE;
1191 }
1192
1193 amfree(buf);
1194
1195 return;
1196 }
1197
1198 /* write the full buffer to the fd, or close on EOF */
1199 if (buf) {
1200 if (!elt->downstream->drain_mode &&
1201 full_write(fd, buf, len) < len) {
1202 if (elt->downstream->must_drain) {
1203 g_debug("Error writing to fd %d: %s",
1204 fd, strerror(errno));
1205 } else if (elt->downstream->ignore_broken_pipe &&
1206 errno == EPIPE) {
1207 } else {
1208 if (!elt->cancelled) {
1209 xfer_cancel_with_error(elt,
1210 _("Error writing to fd %d: %s"),
1211 fd, strerror(errno));
1212 wait_until_xfer_cancelled(elt->xfer);
1213 }
1214 /* nothing special to do to handle a cancellation */
1215 }
1216 elt->downstream->drain_mode = TRUE;
1217 }
1218 amfree(buf);
1219 } else {
1220 close_write_fd(self);
1221 }
1222
1223 return;
1224 }
1225
1226 default:
1227 case PUSH_INVALID:
1228 g_assert_not_reached();
1229 break;
1230 }
1231 }
1232
1233 static void
instance_init(XferElementGlue * self)1234 instance_init(
1235 XferElementGlue *self)
1236 {
1237 XferElement *elt = (XferElement *)self;
1238 elt->can_generate_eof = TRUE;
1239 self->pipe[0] = self->pipe[1] = -1;
1240 self->input_listen_socket = -1;
1241 self->output_listen_socket = -1;
1242 self->input_data_socket = -1;
1243 self->output_data_socket = -1;
1244 self->read_fd = -1;
1245 self->write_fd = -1;
1246 }
1247
1248 static void
finalize_impl(GObject * obj_self)1249 finalize_impl(
1250 GObject * obj_self)
1251 {
1252 XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
1253
1254 /* first make sure the worker thread has finished up */
1255 if (self->thread)
1256 g_thread_join(self->thread);
1257
1258 /* close our pipes and fd's if they're still open */
1259 if (self->pipe[0] != -1) close(self->pipe[0]);
1260 if (self->pipe[1] != -1) close(self->pipe[1]);
1261 if (self->input_data_socket != -1) close(self->input_data_socket);
1262 if (self->output_data_socket != -1) close(self->output_data_socket);
1263 if (self->input_listen_socket != -1) close(self->input_listen_socket);
1264 if (self->output_listen_socket != -1) close(self->output_listen_socket);
1265 if (self->read_fd != -1) close(self->read_fd);
1266 if (self->write_fd != -1) close(self->write_fd);
1267
1268 if (self->ring) {
1269 /* empty the ring buffer, ignoring syncronization issues */
1270 while (self->ring_used_sem->value) {
1271 if (self->ring[self->ring_tail].buf)
1272 amfree(self->ring[self->ring_tail].buf);
1273 self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
1274 }
1275
1276 amfree(self->ring);
1277 amsemaphore_free(self->ring_used_sem);
1278 amsemaphore_free(self->ring_free_sem);
1279 }
1280
1281 /* chain up */
1282 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1283 }
1284
1285 static xfer_element_mech_pair_t _pairs[] = {
1286 { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1287 { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1288 { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1289 { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1290 { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1291
1292 { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */
1293 { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/
1294 { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */
1295 { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/
1296 { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1297
1298 { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */
1299 { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1300 { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */
1301 { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1302 { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1303
1304 { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */
1305 { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1306 { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */
1307 { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1308 { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1309
1310 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1311 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1312 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1313 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1314 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1315
1316 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1317 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1318 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1319 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1320 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1321
1322 /* terminator */
1323 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1324 };
1325 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
1326
1327 static void
class_init(XferElementGlueClass * selfc)1328 class_init(
1329 XferElementGlueClass * selfc)
1330 {
1331 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1332 GObjectClass *goc = G_OBJECT_CLASS(selfc);
1333
1334 klass->setup = setup_impl;
1335 klass->start = start_impl;
1336 klass->push_buffer = push_buffer_impl;
1337 klass->pull_buffer = pull_buffer_impl;
1338
1339 klass->perl_class = "Amanda::Xfer::Element::Glue";
1340 klass->mech_pairs = xfer_element_glue_mech_pairs;
1341
1342 goc->finalize = finalize_impl;
1343
1344 parent_class = g_type_class_peek_parent(selfc);
1345 }
1346
1347 GType
xfer_element_glue_get_type(void)1348 xfer_element_glue_get_type (void)
1349 {
1350 static GType type = 0;
1351
1352 if G_UNLIKELY(type == 0) {
1353 static const GTypeInfo info = {
1354 sizeof (XferElementGlueClass),
1355 (GBaseInitFunc) NULL,
1356 (GBaseFinalizeFunc) NULL,
1357 (GClassInitFunc) class_init,
1358 (GClassFinalizeFunc) NULL,
1359 NULL /* class_data */,
1360 sizeof (XferElementGlue),
1361 0 /* n_preallocs */,
1362 (GInstanceInitFunc) instance_init,
1363 NULL
1364 };
1365
1366 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
1367 }
1368
1369 return type;
1370 }
1371
1372 /* create an element of this class; prototype is in xfer-element.h */
1373 XferElement *
xfer_element_glue(void)1374 xfer_element_glue(void)
1375 {
1376 XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
1377 XferElement *elt = XFER_ELEMENT(self);
1378
1379 return elt;
1380 }
1381