1 /*
2 * linc-connection.c: This file is part of the linc library.
3 *
4 * Authors:
5 * Elliot Lee (sopwith@redhat.com)
6 * Michael Meeks (michael@ximian.com)
7 * Mark McLouglin (mark@skynet.ie) & others
8 *
9 * Copyright 2001, Red Hat, Inc., Ximian, Inc.,
10 * Sun Microsystems, Inc.
11 */
12 #include <config.h>
13 #include <stdarg.h>
14 #include <fcntl.h>
15 #include <errno.h>
16 #include <string.h>
17 #include <stdio.h>
18 #include <ctype.h>
19
20 #ifdef LINK_SSL_SUPPORT
21 # include <openssl/ssl.h>
22 #endif
23
24 #include "linc-private.h"
25 #include "linc-compat.h"
26 #include <linc/linc-config.h>
27 #include <linc/linc-connection.h>
28
29 static GObjectClass *parent_class = NULL;
30 static guint _link_timeout = 0;
31
32 enum {
33 BROKEN,
34 BLOCKING,
35 LAST_SIGNAL
36 };
37 static guint signals [LAST_SIGNAL];
38 static GList *cnx_list = NULL;
39
40 #define CNX_LOCK(cnx) G_STMT_START { link_lock(); } G_STMT_END
41 #define CNX_UNLOCK(cnx) G_STMT_START { link_unlock(); } G_STMT_END
42 #define CNX_LIST_LOCK() CNX_LOCK(0); /* for now */
43 #define CNX_LIST_UNLOCK() CNX_UNLOCK(0); /* for now */
44 #define CNX_AND_LIST_LOCK(cnx) CNX_LOCK(cnx); /* for now */
45 #define CNX_AND_LIST_UNLOCK(cnx) CNX_UNLOCK(cnx); /* for now */
46 #define CNX_IS_LOCKED(cnx) link_is_locked()
47
48 static gboolean link_connection_io_handler (GIOChannel *gioc,
49 GIOCondition condition,
50 gpointer data);
51
52 static inline LinkConnection*
link_connection_ref_T(LinkConnection * cnx)53 link_connection_ref_T (LinkConnection *cnx)
54 {
55 return LINK_CONNECTION (g_object_ref (G_OBJECT (cnx)));
56 }
57
58 LinkConnection *
link_connection_ref(LinkConnection * cnx)59 link_connection_ref (LinkConnection *cnx)
60 {
61 CNX_AND_LIST_LOCK (cnx);
62 g_object_ref (G_OBJECT (cnx));
63 CNX_AND_LIST_UNLOCK (cnx);
64
65 return cnx;
66 }
67
68 /* Only call if we are _certain_ that we don't hold the last ref */
69 static void
link_connection_unref_T_(LinkConnection * cnx)70 link_connection_unref_T_ (LinkConnection *cnx)
71 {
72 g_assert (((GObject *)cnx)->ref_count > 1);
73 g_object_unref (G_OBJECT (cnx));
74 }
75
76 static void
link_connection_unref_unlock(LinkConnection * cnx)77 link_connection_unref_unlock (LinkConnection *cnx)
78 {
79 gboolean tail_unref = FALSE;
80
81 if (((GObject *)cnx)->ref_count > 1)
82 g_object_unref (G_OBJECT (cnx));
83
84 else {
85 cnx_list = g_list_remove (cnx_list, cnx);
86 tail_unref = TRUE;
87 }
88
89 CNX_AND_LIST_UNLOCK (cnx);
90
91 if (tail_unref) {
92 LinkCommandCnxUnref cmd[1];
93
94 cmd->cmd.cmd.type = LINK_COMMAND_CNX_UNREF;
95 cmd->cmd.complete = FALSE;
96 cmd->cnx = cnx;
97 link_exec_command ((LinkCommand *) cmd);
98 }
99 }
100
101 void
link_connection_exec_cnx_unref(LinkCommandCnxUnref * cmd,gboolean immediate)102 link_connection_exec_cnx_unref (LinkCommandCnxUnref *cmd, gboolean immediate)
103 {
104 d_printf ("Exec defered unref on %p\n", cmd->cnx);
105
106 if (immediate) /* In I/O thread - with just 1 ref left */
107 g_object_unref (G_OBJECT (cmd->cnx));
108 else {
109 CNX_AND_LIST_LOCK (cmd->cnx);
110 link_connection_unref_unlock (cmd->cnx);
111 }
112 }
113
114 void
link_connection_unref(LinkConnection * cnx)115 link_connection_unref (LinkConnection *cnx)
116 {
117 g_return_if_fail (cnx != NULL);
118
119 CNX_AND_LIST_LOCK (cnx);
120
121 link_connection_unref_unlock (cnx);
122 }
123
124 static void
link_close_fd(LinkConnection * cnx)125 link_close_fd (LinkConnection *cnx)
126 {
127 if (cnx->priv->fd >= 0) {
128 d_printf ("link_close_fd: closing %d\n", cnx->priv->fd);
129 LINK_CLOSE_SOCKET (cnx->priv->fd);
130 }
131 cnx->priv->fd = -1;
132 }
133
134 typedef struct {
135 LinkBrokenCallback fn;
136 gpointer user_data;
137 } BrokenCallback;
138
139 static void
link_connection_emit_broken(LinkConnection * cnx,GSList * callbacks)140 link_connection_emit_broken (LinkConnection *cnx, GSList *callbacks)
141 {
142 GSList *l;
143
144 for (l = callbacks; l; l = l->next) {
145 BrokenCallback *bc = l->data;
146 bc->fn (cnx, bc->user_data);
147 g_free (bc);
148 }
149 g_slist_free (callbacks);
150 }
151
152 /*
153 * Unfortunate to have a global list, but we need to know
154 * if this is being processed in the main thread & if so
155 * simply append to it.
156 */
157 static GSList *idle_broken_cnxs = NULL;
158
159 static gboolean
link_connection_broken_idle(gpointer dummy)160 link_connection_broken_idle (gpointer dummy)
161 {
162 GSList *callbacks;
163 LinkConnection *cnx;
164
165 d_printf ("Connection broken idle ...\n");
166
167 do {
168 link_lock();
169 cnx = NULL;
170 if (idle_broken_cnxs != NULL) {
171 cnx = idle_broken_cnxs->data;
172 idle_broken_cnxs = g_slist_delete_link (idle_broken_cnxs, idle_broken_cnxs);
173 }
174 if (cnx) {
175 callbacks = cnx->idle_broken_callbacks;
176 cnx->idle_broken_callbacks = NULL;
177 cnx->inhibit_reconnect = FALSE;
178 link_signal ();
179 }
180 link_unlock ();
181
182 if (cnx) {
183 link_connection_emit_broken (cnx, callbacks);
184 link_connection_unref (cnx);
185 }
186 } while (cnx != NULL);
187
188 return FALSE;
189 }
190
191 static void
add_idle_broken_for_cnx_T(LinkConnection * cnx)192 add_idle_broken_for_cnx_T (LinkConnection *cnx)
193 {
194 if (idle_broken_cnxs != NULL) {
195 fprintf (stderr, "Deadlock potential - avoiding evil bug!\n");
196 /*
197 * just append ourself & exit - we don't want to
198 * inhibit re-connection because of the horrendous
199 * deadlock possible, cf. g#534351#
200 */
201 if (g_slist_find (idle_broken_cnxs, cnx) != NULL)
202 return;
203 } else {
204 /* inhibit reconnect while we emit 'broken' */
205 cnx->inhibit_reconnect = TRUE;
206 g_idle_add (link_connection_broken_idle, NULL);
207 }
208 link_connection_ref_T (cnx);
209 idle_broken_cnxs = g_slist_prepend (idle_broken_cnxs, cnx);
210 }
211
212 static void
link_source_remove(LinkConnection * cnx)213 link_source_remove (LinkConnection *cnx)
214 {
215 if (cnx->priv->tag) {
216 LinkWatch *thewatch = cnx->priv->tag;
217 cnx->priv->tag = NULL;
218 link_io_remove_watch (thewatch);
219 d_printf ("Removed watch on %d\n", cnx->priv->fd);
220 }
221 }
222
223 static void
link_source_add(LinkConnection * cnx,GIOCondition condition)224 link_source_add (LinkConnection *cnx,
225 GIOCondition condition)
226 {
227 g_assert (cnx->priv->tag == NULL);
228
229 cnx->priv->tag = link_io_add_watch_fd (
230 cnx->priv->fd, condition,
231 link_connection_io_handler, cnx);
232
233 d_printf ("Added watch on %d (0x%x)\n",
234 cnx->priv->fd, condition);
235 }
236
237 typedef struct {
238 guchar *data;
239
240 struct iovec *vecs;
241 int nvecs;
242 struct iovec single_vec;
243 } QueuedWrite;
244
245 static void
queued_write_free(QueuedWrite * qw)246 queued_write_free (QueuedWrite *qw)
247 {
248 g_free (qw->data);
249 g_free (qw);
250 }
251
252 static void
queue_free(LinkConnection * cnx)253 queue_free (LinkConnection *cnx)
254 {
255 GList *l;
256
257 for (l = cnx->priv->write_queue; l; l = l->next)
258 queued_write_free (l->data);
259
260 g_list_free (cnx->priv->write_queue);
261 cnx->priv->write_queue = NULL;
262 }
263
264 static void
dispatch_callbacks_drop_lock(LinkConnection * cnx)265 dispatch_callbacks_drop_lock (LinkConnection *cnx)
266 {
267 GSList *callbacks;
268
269 callbacks = cnx->idle_broken_callbacks;
270 cnx->idle_broken_callbacks = NULL;
271
272 CNX_UNLOCK (cnx);
273 link_connection_emit_broken (cnx, callbacks);
274 CNX_LOCK (cnx);
275 }
276
277 /*
278 * link_connection_class_state_changed:
279 * @cnx: a #LinkConnection
280 * @status: a #LinkConnectionStatus value.
281 *
282 * Set up linc's #GSources if the connection is in the #LINK_CONNECTED
283 * or #LINK_CONNECTING state.
284 *
285 * Remove the #GSources if the state has channged to #LINK_DISCONNECTED,
286 * close the socket and a gobject broken signal which may be caught by
287 * the application.
288 *
289 * Also perform SSL specific operations if the connection has move into
290 * the #LINK_CONNECTED state.
291 */
292 static void
link_connection_state_changed_T_R(LinkConnection * cnx,LinkConnectionStatus status)293 link_connection_state_changed_T_R (LinkConnection *cnx,
294 LinkConnectionStatus status)
295 {
296 gboolean changed;
297 LinkConnectionClass *klass;
298
299 g_assert (CNX_IS_LOCKED (cnx));
300
301 d_printf ("State changing from '%s' to '%s' on fd %d\n",
302 STATE_NAME (cnx->status), STATE_NAME (status),
303 cnx->priv->fd);
304
305 changed = cnx->status != status;
306
307 cnx->status = status;
308
309 switch (status) {
310 case LINK_CONNECTED:
311 #ifdef LINK_SSL_SUPPORT
312 if (cnx->options & LINK_CONNECTION_SSL) {
313 if (cnx->was_initiated)
314 SSL_connect (cnx->priv->ssl);
315 else
316 SSL_accept (cnx->priv->ssl);
317 }
318 #endif
319 if (!cnx->priv->tag)
320 link_source_add (cnx, LINK_ERR_CONDS | LINK_IN_CONDS);
321 break;
322
323 case LINK_CONNECTING:
324
325 if (cnx->priv->tag) /* re-connecting */
326 link_watch_set_condition (
327 cnx->priv->tag,
328 G_IO_OUT | LINK_ERR_CONDS);
329 else
330 link_source_add (cnx, G_IO_OUT | LINK_ERR_CONDS);
331 break;
332
333 case LINK_DISCONNECTED:
334 case LINK_TIMEOUT:
335 link_source_remove (cnx);
336 link_close_fd (cnx);
337 queue_free (cnx);
338 /* don't free pending queue - we could get re-connected */
339 if (changed) {
340
341 if (!cnx->priv->was_disconnected) {
342 d_printf ("Emitting the broken signal on %p\n", cnx);
343 CNX_UNLOCK (cnx);
344 g_signal_emit (cnx, signals [BROKEN], 0);
345 CNX_LOCK (cnx);
346 }
347
348 if (cnx->idle_broken_callbacks) {
349 if (!link_thread_io ()) {
350 d_printf ("Immediate broken callbacks at immediately\n");
351
352 dispatch_callbacks_drop_lock (cnx);
353 } else {
354 d_printf ("Queuing broken callbacks at idle\n");
355 add_idle_broken_for_cnx_T (cnx);
356 }
357 }
358 }
359 break;
360 }
361
362 klass = (LinkConnectionClass *)G_OBJECT_GET_CLASS (cnx);
363
364 if (klass->state_changed) {
365 link_signal ();
366 CNX_UNLOCK (cnx);
367 klass->state_changed (cnx, status);
368 CNX_LOCK (cnx);
369 }
370 }
371
372 static void
queue_signal_T_R(LinkConnection * cnx,glong delta)373 queue_signal_T_R (LinkConnection *cnx,
374 glong delta)
375 {
376 gulong old_size;
377 gulong new_size;
378
379 d_printf ("Queue signal %ld bytes, delta %ld, max %ld\n",
380 cnx->priv->write_queue_bytes, delta,
381 cnx->priv->max_buffer_bytes);
382
383 old_size = cnx->priv->write_queue_bytes;
384 cnx->priv->write_queue_bytes += delta;
385 new_size = cnx->priv->write_queue_bytes;
386
387 if (cnx->options & LINK_CONNECTION_BLOCK_SIGNAL) {
388 if (new_size == 0 ||
389 (old_size < (cnx->priv->max_buffer_bytes >> 1) &&
390 new_size >= (cnx->priv->max_buffer_bytes >> 1)) ||
391 new_size >= cnx->priv->max_buffer_bytes) {
392 CNX_UNLOCK (cnx);
393 g_signal_emit (cnx, signals [BLOCKING], 0, new_size);
394 CNX_LOCK (cnx);
395 }
396 }
397
398 if (cnx->priv->max_buffer_bytes &&
399 cnx->priv->write_queue_bytes >= cnx->priv->max_buffer_bytes)
400 link_connection_state_changed_T_R (cnx, LINK_DISCONNECTED);
401 }
402
403 static gulong
calc_size(struct iovec * src_vecs,int nvecs)404 calc_size (struct iovec *src_vecs,
405 int nvecs)
406 {
407 int i;
408 gulong total_size = 0;
409
410 for (i = 0; i < nvecs; i++)
411 total_size += src_vecs [i].iov_len;
412
413 return total_size;
414 }
415
416 static void
queue_flattened_T_R(LinkConnection * cnx,struct iovec * src_vecs,int nvecs,gboolean update_poll)417 queue_flattened_T_R (LinkConnection *cnx,
418 struct iovec *src_vecs,
419 int nvecs,
420 gboolean update_poll)
421 {
422 int i;
423 guchar *p;
424 gulong total_size;
425 gboolean new_queue;
426 QueuedWrite *qw = g_new (QueuedWrite, 1);
427
428 total_size = calc_size (src_vecs, nvecs);
429
430 p = g_malloc (total_size);
431
432 qw->data = p;
433 qw->vecs = &qw->single_vec;
434 qw->nvecs = 1;
435
436 qw->vecs->iov_base = p;
437 qw->vecs->iov_len = total_size;
438
439 for (i = 0; i < nvecs; i++) {
440 memcpy (p, src_vecs [i].iov_base, src_vecs [i].iov_len);
441 p += src_vecs [i].iov_len;
442 }
443 g_assert (p == (qw->data + total_size));
444
445 d_printf ("Queueing write of %ld bytes on fd %d\n",
446 total_size, cnx->priv->fd);
447
448 new_queue = cnx->priv->write_queue == NULL;
449 cnx->priv->write_queue = g_list_append (cnx->priv->write_queue, qw);
450 queue_signal_T_R (cnx, total_size);
451
452 if (update_poll && new_queue) {
453 LinkCommandSetCondition *cmd;
454
455 cmd = g_new (LinkCommandSetCondition, 1);
456 cmd->cmd.type = LINK_COMMAND_SET_CONDITION;
457 cmd->cnx = link_connection_ref_T (cnx);
458 cmd->condition = (LINK_ERR_CONDS | LINK_IN_CONDS | G_IO_OUT);
459 link_exec_command (&cmd->cmd);
460 }
461 }
462
463 static void
link_connection_from_fd_T(LinkConnection * cnx,int fd,const LinkProtocolInfo * proto,gchar * remote_host_info,gchar * remote_serv_info,gboolean was_initiated,LinkConnectionStatus status,LinkConnectionOptions options)464 link_connection_from_fd_T (LinkConnection *cnx,
465 int fd,
466 const LinkProtocolInfo *proto,
467 gchar *remote_host_info,
468 gchar *remote_serv_info,
469 gboolean was_initiated,
470 LinkConnectionStatus status,
471 LinkConnectionOptions options)
472 {
473 cnx->was_initiated = was_initiated;
474 cnx->is_auth = (proto->flags & LINK_PROTOCOL_SECURE);
475 cnx->proto = proto;
476 cnx->options = options;
477 cnx->priv->fd = fd;
478
479 g_free (cnx->remote_host_info);
480 cnx->remote_host_info = remote_host_info;
481 g_free (cnx->remote_serv_info);
482 cnx->remote_serv_info = remote_serv_info;
483
484 switch (cnx->proto->family) {
485 case AF_INET:
486 #ifdef AF_INET6
487 case AF_INET6:
488 #endif
489 if (_link_timeout && !cnx->timeout_msec) /* this should'nt happen twice but I'm always paranoid... */
490 cnx->timeout_msec = _link_timeout;
491 break;
492 default:
493 break;
494 }
495
496 d_printf ("Cnx from fd (%d) '%s', '%s', '%s'\n",
497 fd, proto->name,
498 remote_host_info ? remote_host_info : "<Null>",
499 remote_serv_info ? remote_serv_info : "<Null>");
500
501 if (proto->setup)
502 proto->setup (fd, options);
503
504 #ifdef LINK_SSL_SUPPORT
505 if (options & LINK_CONNECTION_SSL) {
506 cnx->priv->ssl = SSL_new (link_ssl_ctx);
507 SSL_set_fd (cnx->priv->ssl, fd);
508 }
509 #endif
510 g_assert (CNX_IS_LOCKED (0));
511 link_connection_state_changed_T_R (cnx, status);
512
513 if (!g_list_find (cnx_list, cnx))
514 cnx_list = g_list_prepend (cnx_list, cnx);
515 }
516
517 /*
518 * link_connection_from_fd:
519 * @cnx: a #LinkConnection.
520 * @fd: a connected/connecting file descriptor.
521 * @proto: a #LinkProtocolInfo.
522 * @remote_host_info: protocol dependant host information; gallocation swallowed
523 * @remote_serv_info: protocol dependant service information(e.g. port number). gallocation swallowed
524 * @was_initiated: #TRUE if the connection was initiated by us.
525 * @status: a #LinkConnectionStatus value.
526 * @options: combination of #LinkConnectionOptions.
527 *
528 * Fill in @cnx, call protocol specific initialisation methonds and then
529 * call link_connection_state_changed.
530 *
531 * Return Value: #TRUE if the function succeeds, #FALSE otherwise.
532 */
533 void
link_connection_from_fd(LinkConnection * cnx,int fd,const LinkProtocolInfo * proto,gchar * remote_host_info,gchar * remote_serv_info,gboolean was_initiated,LinkConnectionStatus status,LinkConnectionOptions options)534 link_connection_from_fd (LinkConnection *cnx,
535 int fd,
536 const LinkProtocolInfo *proto,
537 gchar *remote_host_info,
538 gchar *remote_serv_info,
539 gboolean was_initiated,
540 LinkConnectionStatus status,
541 LinkConnectionOptions options)
542 {
543 CNX_LOCK (cnx);
544
545 link_connection_from_fd_T (cnx, fd, proto,
546 remote_serv_info, remote_serv_info,
547 was_initiated, status, options);
548 CNX_UNLOCK (cnx);
549 }
550
551 #ifndef G_OS_WIN32
552 static void
fix_permissions(const char * filename)553 fix_permissions (const char *filename)
554 {
555 char *tmp_dir = g_strdup (filename);
556 char *p;
557 struct stat stat_buf;
558
559 if (!tmp_dir)
560 return;
561 p = strrchr (tmp_dir, '/');
562 if (p) {
563 *p = '\0';
564 stat (tmp_dir, &stat_buf);
565 chown (filename, stat_buf.st_uid, -1);
566 }
567 }
568 #endif
569
570 static gboolean
link_connection_do_initiate(LinkConnection * cnx,const char * proto_name,const char * host,const char * service,LinkConnectionOptions options)571 link_connection_do_initiate (LinkConnection *cnx,
572 const char *proto_name,
573 const char *host,
574 const char *service,
575 LinkConnectionOptions options)
576 {
577 const LinkProtocolInfo *proto;
578 int rv;
579 int fd;
580 gboolean retval = FALSE;
581 struct sockaddr *saddr;
582 LinkSockLen saddr_len;
583
584 proto = link_protocol_find (proto_name);
585
586 if (!proto)
587 return FALSE;
588
589 saddr = link_protocol_get_sockaddr (
590 proto, host, service, &saddr_len);
591
592 if (!saddr && (strcmp (proto_name, "IPv6") ==0)) {/* Falling back to IPv4 */
593 proto = link_protocol_find ("IPv4");
594
595 saddr = link_protocol_get_sockaddr (
596 proto, host, service, &saddr_len);
597 }
598
599 if (!saddr)
600 return FALSE;
601
602 fd = socket (proto->family, SOCK_STREAM,
603 proto->stream_proto_num);
604 #ifdef HAVE_WINSOCK2_H
605 if (fd == INVALID_SOCKET) {
606 fd = -1;
607 link_map_winsock_error_to_errno ();
608 }
609 #endif
610
611 if (fd < 0) {
612 d_printf ("socket() failed: %s\n", link_strerror (errno));
613 goto out;
614 }
615
616 if (options & LINK_CONNECTION_NONBLOCKING) {
617 #ifdef HAVE_WINSOCK2_H
618 u_long yes = 1;
619 if (ioctlsocket (fd, FIONBIO, &yes) == SOCKET_ERROR) {
620 link_map_winsock_error_to_errno ();
621 d_printf ("ioctlsocket(FIONBIO) failed: %s\n",
622 link_strerror (errno));
623 goto out;
624 }
625 #else
626 if (fcntl (fd, F_SETFL, O_NONBLOCK) < 0)
627 goto out;
628 #endif
629 }
630
631 #if defined (F_SETFD) && defined (FD_CLOEXEC)
632 if (fcntl (fd, F_SETFD, FD_CLOEXEC) < 0)
633 goto out;
634 #endif
635 #ifdef HAVE_WINSOCK2_H
636 {
637 SOCKET newfd;
638
639 if (!DuplicateHandle (GetCurrentProcess (), (HANDLE) fd,
640 GetCurrentProcess (), (LPHANDLE) &newfd,
641 0, FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE)) {
642 d_printf ("DuplicateHandle failed: %s\n", link_strerror (WSAGetLastError ()));
643 return FALSE;
644 }
645 fd = newfd;
646 }
647 #endif
648
649 #ifndef G_OS_WIN32
650 if (!strcmp (proto_name, "UNIX") && getuid() == 0) {
651 fix_permissions (service);
652 }
653 #endif
654
655 LINK_TEMP_FAILURE_RETRY_SOCKET (connect (fd, saddr, saddr_len), rv);
656 #ifdef HAVE_WINSOCK2_H
657 if (rv == SOCKET_ERROR) {
658 if ((options & LINK_CONNECTION_NONBLOCKING) &&
659 WSAGetLastError () == WSAEWOULDBLOCK) {
660 /* connect() for nonblocking sockets always
661 * fails with WSAEWOULDBLOCK. We have to
662 * select() to wait for actual status.
663 */
664 fd_set write_fds, except_fds;
665
666 FD_ZERO (&write_fds);
667 FD_SET (fd, &write_fds);
668
669 FD_ZERO (&except_fds);
670 FD_SET (fd, &except_fds);
671
672 rv = select (1, NULL, &write_fds, &except_fds, NULL);
673 if (rv == SOCKET_ERROR) {
674 rv = -1;
675 link_map_winsock_error_to_errno ();
676 } else if (FD_ISSET (fd, &write_fds)) {
677 rv = 0;
678 } else if (FD_ISSET (fd, &except_fds)) {
679 rv = -1;
680 errno = WSAECONNREFUSED;
681 }
682 } else {
683 rv = -1;
684 link_map_winsock_error_to_errno ();
685 }
686 }
687 #endif
688 if (rv && errno != EINPROGRESS)
689 goto out;
690
691 d_printf ("initiate 'connect' on new fd %d\n", fd);
692
693 g_assert (CNX_IS_LOCKED (0));
694 link_connection_from_fd_T
695 (cnx, fd, proto,
696 g_strdup (host), g_strdup (service),
697 TRUE, rv ? LINK_CONNECTING : LINK_CONNECTED,
698 options);
699 retval = TRUE;
700
701 out:
702 if (!retval && fd >= 0) {
703 d_printf ("initiation failed: %s\n", link_strerror (errno));
704 d_printf ("closing %d\n", fd);
705 LINK_CLOSE_SOCKET (fd);
706 }
707
708 g_free (saddr);
709
710 return retval;
711 }
712
713 static LinkConnectionStatus
link_connection_wait_connected_T(LinkConnection * cnx)714 link_connection_wait_connected_T (LinkConnection *cnx)
715 {
716 while (cnx && cnx->status == LINK_CONNECTING)
717 link_wait ();
718
719 return cnx ? cnx->status : LINK_DISCONNECTED;
720 }
721
722 LinkConnectionStatus
link_connection_try_reconnect(LinkConnection * cnx)723 link_connection_try_reconnect (LinkConnection *cnx)
724 {
725 LinkConnectionStatus status;
726
727 g_return_val_if_fail (LINK_IS_CONNECTION (cnx), LINK_DISCONNECTED);
728
729 CNX_LOCK (cnx);
730
731 d_printf ("Try for reconnection on %p: %d\n",
732 cnx, cnx->inhibit_reconnect);
733
734 while (cnx->inhibit_reconnect) {
735 if (g_main_context_acquire (NULL)) {
736 d_printf ("Dispatch callbacks in 'main' (mainloop owning) thread\n");
737 cnx->inhibit_reconnect = FALSE;
738 dispatch_callbacks_drop_lock (cnx);
739 g_main_context_release (NULL);
740 } else
741 link_wait ();
742 }
743
744 switch (cnx->status) {
745 case LINK_DISCONNECTED :
746 case LINK_TIMEOUT :
747 link_connection_do_initiate
748 (cnx, cnx->proto->name, cnx->remote_host_info,
749 cnx->remote_serv_info, cnx->options);
750 break;
751 default :
752 g_warning ("trying to re-connect connected cnx.");
753 break;
754 }
755
756 cnx->priv->was_disconnected = TRUE;
757 status = link_connection_wait_connected_T (cnx);
758 cnx->priv->was_disconnected = FALSE;
759
760 CNX_UNLOCK (cnx);
761
762 return status;
763 }
764
765 /**
766 * link_connection_initiate_list:
767 * @derived_type: a #LinkConnection derived type
768 * @proto_name: the name of the protocol to use.
769 * @host: protocol dependant host information.
770 * @service: protocol dependant service information(e.g. port number).
771 * @options: combination of #LinkConnectionOptions.
772 * @opt_construct_fn: optional constructor fn for new cnx's or NULL
773 * @user_data: optional user data for constructor
774 *
775 * Looks up a connection in our cnx. list to see if we already
776 * have a matching connection; if so returns it, otherwise
777 * constructs a new cnx. and retursn that
778 *
779 * Return value: an incremented cnx ref.
780 **/
781 LinkConnection *
link_connection_initiate(GType derived_type,const char * proto_name,const char * remote_host_info,const char * remote_serv_info,LinkConnectionOptions options,const char * first_property,...)782 link_connection_initiate (GType derived_type,
783 const char *proto_name,
784 const char *remote_host_info,
785 const char *remote_serv_info,
786 LinkConnectionOptions options,
787 const char *first_property,
788 ...)
789 {
790 va_list args;
791 GList *l;
792 gboolean initiated = TRUE;
793 LinkConnection *cnx = NULL;
794 const LinkProtocolInfo *proto;
795
796 va_start (args, first_property);
797
798 proto = link_protocol_find (proto_name);
799
800 CNX_LIST_LOCK();
801
802 /* FIXME: hash this if it's slow */
803 for (l = cnx_list; l; l = l->next) {
804 cnx = l->data;
805
806 if (cnx->was_initiated && cnx->proto == proto &&
807 cnx->status != LINK_DISCONNECTED &&
808 ((cnx->options & LINK_CONNECTION_SSL) == (options & LINK_CONNECTION_SSL)) &&
809 !strcmp (remote_host_info, cnx->remote_host_info) &&
810 !strcmp (remote_serv_info, cnx->remote_serv_info)) {
811 cnx = link_connection_ref_T (cnx);
812 break;
813 }
814 }
815
816 cnx = l ? l->data : NULL;
817
818 if (!cnx) {
819 cnx = LINK_CONNECTION
820 (g_object_new_valist (derived_type, first_property, args));
821
822 initiated = link_connection_do_initiate
823 (cnx, proto_name, remote_host_info,
824 remote_serv_info, options);
825 }
826
827 CNX_LIST_UNLOCK();
828
829 if (!initiated) {
830 link_connection_unref (cnx);
831 cnx = NULL;
832 }
833
834 va_end (args);
835
836 return cnx;
837 }
838
839 /*
840 * link_connection_state_changed:
841 * @cnx: a #LinkConnection.
842 * @status: a #LinkConnectionStatus.
843 *
844 * A wrapper for the #LinkConnectionClass's state change method.
845 */
846 void
link_connection_state_changed(LinkConnection * cnx,LinkConnectionStatus status)847 link_connection_state_changed (LinkConnection *cnx,
848 LinkConnectionStatus status)
849 {
850 CNX_LOCK (cnx);
851 link_connection_state_changed_T_R (cnx, status);
852 CNX_UNLOCK (cnx);
853 }
854
855 /**
856 * link_connection_read:
857 * @cnx: the connection to write to
858 * @buf: a pointer to the start of an array of bytes to read data into
859 * @len: the length of the array in bytes to read ingo
860 * @block_for_full_read: whether to block for a full read
861 *
862 * Warning, block_for_full_read is of limited usefullness.
863 *
864 * Return value: number of bytes written on success; negative on error.
865 **/
866 glong
link_connection_read(LinkConnection * cnx,guchar * buf,int len,gboolean block_for_full_read)867 link_connection_read (LinkConnection *cnx,
868 guchar *buf,
869 int len,
870 gboolean block_for_full_read)
871 {
872 int bytes_read = 0;
873
874 d_printf ("Read up to %d bytes from fd %d\n", len, cnx->priv->fd);
875
876 if (!len)
877 return 0;
878
879 CNX_LOCK (cnx);
880
881 if (cnx->status != LINK_CONNECTED)
882 goto fatal_error;
883
884 do {
885 int n;
886
887 #ifdef LINK_SSL_SUPPORT
888 if (cnx->options & LINK_CONNECTION_SSL)
889 n = SSL_read (cnx->priv->ssl, buf, len);
890 else
891 #endif
892 #ifdef HAVE_WINSOCK2_H
893 if ((n = recv (cnx->priv->fd, buf, len, 0)) == SOCKET_ERROR) {
894 n = -1;
895 link_map_winsock_error_to_errno ();
896 d_printf ("recv failed: %s\n",
897 link_strerror (errno));
898 }
899 #else
900 LINK_TEMP_FAILURE_RETRY_SYSCALL (read (cnx->priv->fd,
901 buf,
902 len), n);
903 #endif
904 g_assert (n <= len);
905
906 if (n < 0) {
907 #ifdef LINK_SSL_SUPPORT
908 if (cnx->options & LINK_CONNECTION_SSL) {
909 gulong rv;
910
911 rv = SSL_get_error (cnx->priv->ssl, n);
912
913 if ((rv == SSL_ERROR_WANT_READ ||
914 rv == SSL_ERROR_WANT_WRITE) &&
915 (cnx->options & LINK_CONNECTION_NONBLOCKING))
916 goto out;
917 else
918 goto fatal_error;
919 } else
920 #endif
921 {
922 if (errno == EINTR)
923 continue;
924
925 else if (errno == EAGAIN &&
926 (cnx->options & LINK_CONNECTION_NONBLOCKING))
927 goto out;
928
929 else if (errno == EBADF) {
930 g_warning ("Serious fd usage error %d", cnx->priv->fd);
931 goto fatal_error;
932
933 } else
934 goto fatal_error;
935 }
936
937 } else if (n == 0) {
938 d_printf ("we got EOF on fd %d\n", cnx->priv->fd);
939 bytes_read = LINK_IO_FATAL_ERROR;
940 goto out;
941 } else {
942 buf += n;
943 len -= n;
944 bytes_read += n;
945 #ifdef CONNECTION_DEBUG
946 cnx->priv->total_read_bytes += n;
947 #endif
948 }
949 } while (len > 0 && block_for_full_read);
950
951 #ifdef CONNECTION_DEBUG
952 d_printf ("we read %d bytes (total %"G_GUINT64_FORMAT")\n",
953 bytes_read, cnx->priv->total_read_bytes);
954 #endif
955
956 out:
957 CNX_UNLOCK (cnx);
958 return bytes_read;
959
960 fatal_error:
961 CNX_UNLOCK (cnx);
962 return LINK_IO_FATAL_ERROR;
963 }
964
965 /* Determine the maximum size of the iovec vector */
966
967 #if defined (MAXIOV) /* HPUX */
968 # define LINK_IOV_MAX (MAXIOV)
969 #elif defined (IOV_MAX) /* AIX */
970 # define LINK_IOV_MAX (IOV_MAX)
971 #elif defined (_SC_IOV_MAX) /* SGI */
972 # define LINK_IOV_MAX_INIT (sysconf (_SC_IOV_MAX))
973 #elif defined (__APPLE__)
974 /* Even though the write(2) man page mentions it, UIO_MAXIOV is only
975 * available if KERNEL is defined on MacOS X 10.1
976 */
977 # define LINK_IOV_MAX 1024
978 #elif defined (UIO_MAXIOV) /* Glibc */
979 # define LINK_IOV_MAX (UIO_MAXIOV)
980 #else /* Safe Guess */
981 # define LINK_IOV_MAX 16
982 #endif
983
984 /* If the value requires initialization, define the function here */
985 #if defined (LINK_IOV_MAX_INIT)
986 # define LINK_IOV_MAX link_iov_max
987 static guint link_iov_max = 0;
988 static inline void
link_iov_max_init()989 link_iov_max_init ()
990 {
991 if (link_iov_max == 0)
992 {
993 gint max;
994 G_LOCK_DEFINE_STATIC (link_iov_max);
995 G_LOCK (link_iov_max);
996 if (link_iov_max == 0)
997 {
998 max = LINK_IOV_MAX_INIT;
999 if (max <= 0)
1000 max = 16;
1001 link_iov_max = max;
1002 }
1003 G_UNLOCK (link_iov_max);
1004 }
1005 }
1006 #else
1007 # define link_iov_max_init()
1008 #endif
1009
1010 static glong
write_data_T(LinkConnection * cnx,QueuedWrite * qw)1011 write_data_T (LinkConnection *cnx, QueuedWrite *qw)
1012 {
1013 glong bytes_written = 0;
1014
1015 g_return_val_if_fail (cnx->status == LINK_CONNECTED,
1016 LINK_IO_FATAL_ERROR);
1017
1018 link_iov_max_init ();
1019
1020 while ((qw->nvecs > 0) && (qw->vecs->iov_len > 0)) {
1021 int n;
1022
1023 d_printf ("write_data %ld bytes to fd %d - ",
1024 calc_size (qw->vecs, qw->nvecs), cnx->priv->fd);
1025
1026 #ifdef LINK_SSL_SUPPORT
1027 if (cnx->options & LINK_CONNECTION_SSL)
1028 n = SSL_write (cnx->priv->ssl, qw->vecs->iov_base,
1029 qw->vecs->iov_len);
1030 else
1031 #endif
1032 #ifdef HAVE_WINSOCK2_H
1033 {
1034 if (WSASend (cnx->priv->fd, qw->vecs,
1035 MIN (qw->nvecs, LINK_IOV_MAX),
1036 (LPDWORD) &n, 0, NULL, NULL) == SOCKET_ERROR) {
1037 if (WSAGetLastError () == WSAEWOULDBLOCK)
1038 link_win32_watch_set_write_wouldblock (cnx->priv->tag, TRUE);
1039 n = -1;
1040 link_map_winsock_error_to_errno ();
1041 d_printf ("WSASend failed: %s\n",
1042 link_strerror (errno));
1043 } else {
1044 link_win32_watch_set_write_wouldblock (cnx->priv->tag, FALSE);
1045 }
1046 }
1047 #else
1048 LINK_TEMP_FAILURE_RETRY_SOCKET (writev (cnx->priv->fd,
1049 qw->vecs,
1050 MIN (qw->nvecs, LINK_IOV_MAX)), n);
1051 #endif
1052 #ifdef CONNECTION_DEBUG
1053 d_printf ("wrote %d bytes (total %"G_GUINT64_FORMAT")\n",
1054 n,
1055 (cnx->priv->total_written_bytes += ((n > 0) ? n : 0),
1056 cnx->priv->total_written_bytes));
1057 #endif
1058 if (n < 0) {
1059 #ifdef LINK_SSL_SUPPORT
1060 if (cnx->options & LINK_CONNECTION_SSL) {
1061 gulong rv;
1062
1063 rv = SSL_get_error (cnx->priv->ssl, n);
1064
1065 if ((rv == SSL_ERROR_WANT_READ ||
1066 rv == SSL_ERROR_WANT_WRITE) &&
1067 cnx->options & LINK_CONNECTION_NONBLOCKING)
1068 return LINK_IO_QUEUED_DATA;
1069 else
1070 return LINK_IO_FATAL_ERROR;
1071 } else
1072 #endif
1073 {
1074 if (errno == EINTR)
1075 continue;
1076
1077 else if (errno == EAGAIN &&
1078 (cnx->options & LINK_CONNECTION_NONBLOCKING))
1079 return LINK_IO_QUEUED_DATA;
1080
1081 else if (errno == EBADF)
1082 g_warning ("Serious fd usage error %d", cnx->priv->fd);
1083
1084 return LINK_IO_FATAL_ERROR; /* Unhandlable error */
1085 }
1086
1087 } else if (n == 0) /* CHECK: is this really an error condition */
1088 return LINK_IO_FATAL_ERROR;
1089
1090 else {
1091 bytes_written += n;
1092
1093 while (qw->nvecs > 0 && n >= qw->vecs->iov_len) {
1094 n -= qw->vecs->iov_len;
1095 qw->nvecs--;
1096 qw->vecs++;
1097 }
1098
1099 if (n) {
1100 qw->vecs->iov_len -= n;
1101 qw->vecs->iov_base = (guchar *)qw->vecs->iov_base + n;
1102 }
1103 }
1104 }
1105
1106 return bytes_written;
1107 }
1108
1109 static gboolean
link_connection_should_block(LinkConnection * cnx,const LinkWriteOpts * opt_write_opts)1110 link_connection_should_block (LinkConnection *cnx,
1111 const LinkWriteOpts *opt_write_opts)
1112 {
1113 if (!opt_write_opts)
1114 return TRUE;
1115
1116 if (opt_write_opts->block_on_write)
1117 return TRUE;
1118
1119 return FALSE;
1120 }
1121
1122 /* Always called in the I/O thread */
1123 static void
link_connection_flush_write_queue_T_R(LinkConnection * cnx)1124 link_connection_flush_write_queue_T_R (LinkConnection *cnx)
1125 {
1126 gboolean done_writes = TRUE;
1127
1128 if (cnx->priv->write_queue) {
1129 glong status;
1130 QueuedWrite *qw = cnx->priv->write_queue->data;
1131
1132 status = write_data_T (cnx, qw);
1133
1134 d_printf ("Wrote queue %ld on fd %d\n", status, cnx->priv->fd);
1135
1136 if (status >= LINK_IO_OK) {
1137 cnx->priv->write_queue = g_list_delete_link
1138 (cnx->priv->write_queue, cnx->priv->write_queue);
1139 queued_write_free (qw);
1140
1141 queue_signal_T_R (cnx, -status);
1142
1143 done_writes = (cnx->priv->write_queue == NULL);
1144
1145 } else {
1146 if (status == LINK_IO_FATAL_ERROR) {
1147 d_printf ("Fatal error on queued write\n");
1148 link_connection_state_changed_T_R (cnx, LINK_DISCONNECTED);
1149
1150 } else {
1151 d_printf ("Write blocked\n");
1152 done_writes = FALSE;
1153 }
1154 }
1155 }
1156
1157 d_printf ("Blocked write queue %s\n", done_writes ?
1158 "flushed & empty" : "still active");
1159
1160 if (done_writes) /* drop G_IO_OUT */
1161 link_watch_set_condition
1162 (cnx->priv->tag,
1163 LINK_ERR_CONDS | LINK_IN_CONDS);
1164 else
1165 link_watch_set_condition
1166 (cnx->priv->tag,
1167 LINK_ERR_CONDS | LINK_IN_CONDS | G_IO_OUT);
1168 }
1169
1170 void
link_connection_exec_set_condition(LinkCommandSetCondition * cmd,gboolean immediate)1171 link_connection_exec_set_condition (LinkCommandSetCondition *cmd, gboolean immediate)
1172 {
1173 d_printf ("Exec defered set condition on %p -> 0x%x\n",
1174 cmd->cnx, cmd->condition);
1175
1176 if (!immediate)
1177 CNX_LOCK (cmd->cnx);
1178
1179 link_watch_set_condition (cmd->cnx->priv->tag, cmd->condition);
1180
1181 if (!immediate)
1182 link_connection_unref_unlock (cmd->cnx);
1183
1184 else /* special */
1185 link_connection_unref_T_ (cmd->cnx);
1186
1187 g_free (cmd);
1188 }
1189
1190 /**
1191 * link_connection_writev:
1192 * @cnx: the connection to write to
1193 * @vecs: a structure of iovecs to write - this is altered.
1194 * @nvecs: the number of populated iovecs
1195 * @opt_write_opts: optional write options, or NULL
1196 *
1197 * This routine writes data to the abstract connection.
1198 * FIXME: it allows re-enterancy via link_connection_iterate
1199 * in certain cases.
1200 * FIXME: on this basis, the connection can die underneath
1201 * our feet.
1202 *
1203 * Return value: 0 on success, non 0 on error.
1204 **/
1205 LinkIOStatus
link_connection_writev(LinkConnection * cnx,struct iovec * vecs,int nvecs,const LinkWriteOpts * opt_write_opts)1206 link_connection_writev (LinkConnection *cnx,
1207 struct iovec *vecs,
1208 int nvecs,
1209 const LinkWriteOpts *opt_write_opts)
1210 {
1211 QueuedWrite qw;
1212 int status;
1213
1214 CNX_LOCK (cnx);
1215 link_connection_ref_T (cnx);
1216
1217 if (link_thread_safe ()) {
1218 d_printf ("Thread safe writev\n");
1219 if (cnx->status == LINK_CONNECTING) {
1220 queue_flattened_T_R (cnx, vecs, nvecs, TRUE);
1221 link_connection_unref_unlock (cnx);
1222 return LINK_IO_QUEUED_DATA;
1223 }
1224 } else if (cnx->options & LINK_CONNECTION_NONBLOCKING)
1225 link_connection_wait_connected (cnx);
1226
1227 if (cnx->status == LINK_DISCONNECTED) {
1228 link_connection_unref_unlock (cnx);
1229 return LINK_IO_FATAL_ERROR;
1230 }
1231
1232 if (cnx->priv->write_queue) {
1233 /* FIXME: we should really retry the write here, but we'll
1234 * get a POLLOUT for this lot at some stage anyway */
1235 queue_flattened_T_R (cnx, vecs, nvecs, FALSE);
1236 link_connection_unref_unlock (cnx);
1237 return LINK_IO_QUEUED_DATA;
1238 }
1239
1240 qw.vecs = vecs;
1241 qw.nvecs = nvecs;
1242
1243 continue_write:
1244 status = write_data_T (cnx, &qw);
1245
1246 if (status == LINK_IO_QUEUED_DATA) {
1247 if (link_thread_safe ()) {
1248 queue_flattened_T_R (cnx, qw.vecs, qw.nvecs, TRUE);
1249 link_connection_unref_unlock (cnx);
1250 return LINK_IO_QUEUED_DATA;
1251 }
1252
1253 /* Queue data & listen for buffer space */
1254 link_watch_set_condition
1255 (cnx->priv->tag,
1256 LINK_ERR_CONDS | LINK_IN_CONDS | G_IO_OUT);
1257
1258 if (!link_connection_should_block (cnx, opt_write_opts)) {
1259 queue_flattened_T_R (cnx, qw.vecs, qw.nvecs, FALSE);
1260 link_connection_unref_unlock (cnx);
1261 return LINK_IO_QUEUED_DATA;
1262
1263 } else {
1264 link_main_iteration (TRUE);
1265 goto continue_write;
1266 }
1267
1268 } else if (status >= LINK_IO_OK)
1269 status = LINK_IO_OK;
1270
1271 link_connection_unref_unlock (cnx);
1272
1273 return status;
1274 }
1275
1276 /**
1277 * link_connection_write:
1278 * @cnx: the connection to write to
1279 * @buf: a pointer to the start of an array of bytes
1280 * @len: the length of the array in bytes
1281 * @opt_write_opts: optional write options, or NULL
1282 *
1283 * Writes a contiguous block of data to the abstract connection.
1284 *
1285 * FIXME: it allows re-enterancy via link_connection_iterate
1286 * in certain cases.
1287 * FIXME: on this basis, the connection can die underneath
1288 * our feet eg. between the main_iteration and the
1289 * g_return_if_fail.
1290 *
1291 * Return value: 0 on success, non 0 on error.
1292 **/
1293 LinkIOStatus
link_connection_write(LinkConnection * cnx,const guchar * buf,gulong len,const LinkWriteOpts * opt_write_opts)1294 link_connection_write (LinkConnection *cnx,
1295 const guchar *buf,
1296 gulong len,
1297 const LinkWriteOpts *opt_write_opts)
1298 {
1299 struct iovec vec;
1300
1301 vec.iov_base = (guchar *) buf;
1302 vec.iov_len = len;
1303
1304 return link_connection_writev (cnx, &vec, 1, opt_write_opts);
1305 }
1306
1307 static void
link_connection_dispose(GObject * obj)1308 link_connection_dispose (GObject *obj)
1309 {
1310 LinkConnection *cnx = (LinkConnection *)obj;
1311
1312 d_printf ("dispose connection %p\n", obj);
1313
1314 link_source_remove (cnx);
1315 queue_free (cnx);
1316
1317 parent_class->dispose (obj);
1318 }
1319
1320 static void
link_connection_finalize(GObject * obj)1321 link_connection_finalize (GObject *obj)
1322 {
1323 GSList *l;
1324 LinkConnection *cnx = (LinkConnection *)obj;
1325
1326 link_close_fd (cnx);
1327
1328 for (l = cnx->idle_broken_callbacks; l; l = l->next)
1329 g_free (l->data);
1330 g_slist_free (cnx->idle_broken_callbacks);
1331
1332 g_free (cnx->remote_host_info);
1333 g_free (cnx->remote_serv_info);
1334
1335 g_free (cnx->priv);
1336
1337 if (cnx->timeout_mutex)
1338 g_mutex_free (cnx->timeout_mutex);
1339 if (cnx->timeout_source_id)
1340 link_io_thread_remove_timeout (cnx->timeout_source_id);
1341
1342 parent_class->finalize (obj);
1343 }
1344
1345 static void
link_connection_init(LinkConnection * cnx)1346 link_connection_init (LinkConnection *cnx)
1347 {
1348 d_printf ("create new connection %p\n", cnx);
1349
1350 cnx->priv = g_new0 (LinkConnectionPrivate, 1);
1351 cnx->priv->fd = -1;
1352 cnx->priv->was_disconnected = FALSE;
1353
1354 cnx->timeout_mutex = NULL;
1355 cnx->timeout_msec = 0;
1356 cnx->timeout_source_id = 0;
1357 cnx->timeout_status = LINK_TIMEOUT_UNKNOWN;
1358 cnx->tdata = NULL;
1359
1360 #ifdef CONNECTION_DEBUG
1361 cnx->priv->total_read_bytes = 0;
1362 cnx->priv->total_written_bytes = 0;
1363 #endif
1364 }
1365
1366 static void
link_connection_class_init(LinkConnectionClass * klass)1367 link_connection_class_init (LinkConnectionClass *klass)
1368 {
1369 GObjectClass *object_class = (GObjectClass *) klass;
1370
1371 object_class->dispose = link_connection_dispose;
1372 object_class->finalize = link_connection_finalize;
1373
1374 signals [BROKEN] =
1375 g_signal_new ("broken",
1376 G_TYPE_FROM_CLASS (object_class),
1377 G_SIGNAL_RUN_LAST,
1378 G_STRUCT_OFFSET (LinkConnectionClass, broken),
1379 NULL, NULL,
1380 g_cclosure_marshal_VOID__VOID,
1381 G_TYPE_NONE, 0);
1382
1383 signals [BLOCKING] =
1384 g_signal_new ("blocking",
1385 G_TYPE_FROM_CLASS (object_class),
1386 G_SIGNAL_RUN_LAST,
1387 G_STRUCT_OFFSET (LinkConnectionClass, blocking),
1388 NULL, NULL,
1389 g_cclosure_marshal_VOID__ULONG,
1390 G_TYPE_NONE, 1, G_TYPE_ULONG);
1391
1392 parent_class = g_type_class_peek_parent (klass);
1393 }
1394
1395 GType
link_connection_get_type(void)1396 link_connection_get_type (void)
1397 {
1398 static GType object_type = 0;
1399
1400 if (!object_type) {
1401 static const GTypeInfo object_info = {
1402 sizeof (LinkConnectionClass),
1403 (GBaseInitFunc) NULL,
1404 (GBaseFinalizeFunc) NULL,
1405 (GClassInitFunc) link_connection_class_init,
1406 NULL, /* class_finalize */
1407 NULL, /* class_data */
1408 sizeof (LinkConnection),
1409 0, /* n_preallocs */
1410 (GInstanceInitFunc) link_connection_init
1411 };
1412
1413 object_type = g_type_register_static (G_TYPE_OBJECT,
1414 "LinkConnection",
1415 &object_info,
1416 0);
1417 }
1418
1419 return object_type;
1420 }
1421
1422
1423 LinkWriteOpts *
link_write_options_new(gboolean block_on_write)1424 link_write_options_new (gboolean block_on_write)
1425 {
1426 LinkWriteOpts *write_opts = g_new0 (LinkWriteOpts, 1);
1427
1428 write_opts->block_on_write = block_on_write;
1429
1430 return write_opts;
1431 }
1432
1433 void
link_write_options_free(LinkWriteOpts * write_opts)1434 link_write_options_free (LinkWriteOpts *write_opts)
1435 {
1436 g_free (write_opts);
1437 }
1438
1439 void
link_connection_set_max_buffer(LinkConnection * cnx,gulong max_buffer_bytes)1440 link_connection_set_max_buffer (LinkConnection *cnx,
1441 gulong max_buffer_bytes)
1442 {
1443 g_return_if_fail (cnx != NULL);
1444
1445 CNX_LOCK (cnx);
1446 /* FIXME: we might want to check the current buffer size */
1447 cnx->priv->max_buffer_bytes = max_buffer_bytes;
1448
1449 CNX_UNLOCK (cnx);
1450 }
1451
1452 static gboolean
link_connection_io_handler(GIOChannel * gioc,GIOCondition condition,gpointer data)1453 link_connection_io_handler (GIOChannel *gioc,
1454 GIOCondition condition,
1455 gpointer data)
1456 {
1457 LinkConnection *cnx = data;
1458 LinkConnectionClass *klass;
1459
1460 d_printf ("link_connection_io_handler fd %d, 0x%x\n",
1461 cnx->priv->fd, condition);
1462
1463 CNX_LOCK (cnx);
1464 link_connection_ref_T (cnx);
1465
1466 klass = (LinkConnectionClass *) G_TYPE_INSTANCE_GET_CLASS (
1467 data, LINK_TYPE_CONNECTION, LinkConnection);
1468
1469 if (cnx->status == LINK_CONNECTED &&
1470 condition & LINK_IN_CONDS && klass->handle_input) {
1471
1472 d_printf ("Handle input on fd %d\n", cnx->priv->fd);
1473
1474 CNX_UNLOCK (cnx);
1475 klass->handle_input (cnx);
1476 CNX_LOCK (cnx);
1477 }
1478
1479 if (cnx->status == LINK_CONNECTED && condition & G_IO_OUT) {
1480 d_printf ("IO Out - buffer space free ...\n");
1481 link_connection_flush_write_queue_T_R (cnx);
1482 }
1483
1484 if (condition & (LINK_ERR_CONDS | G_IO_OUT)) {
1485 int rv, n;
1486 LinkSockLen n_size = sizeof (n);
1487
1488 switch (cnx->status) {
1489 case LINK_CONNECTING:
1490 n = 0;
1491 rv = getsockopt (cnx->priv->fd, SOL_SOCKET, SO_ERROR, (char *) &n, &n_size);
1492 if (!rv && !n && condition == G_IO_OUT) {
1493 d_printf ("State changed to connected on %d\n", cnx->priv->fd);
1494
1495 link_watch_set_condition (
1496 cnx->priv->tag,
1497 LINK_ERR_CONDS | LINK_IN_CONDS);
1498
1499 link_connection_state_changed_T_R (cnx, LINK_CONNECTED);
1500
1501 if (cnx->priv->write_queue) {
1502 d_printf ("Connected, with queued writes, start flush ...\n");
1503 link_connection_flush_write_queue_T_R (cnx);
1504 }
1505 } else {
1506 d_printf ("Error connecting %d %d %d on fd %d\n",
1507 rv, n, errno, cnx->priv->fd);
1508 link_connection_state_changed_T_R (cnx, LINK_DISCONNECTED);
1509 }
1510 break;
1511 case LINK_CONNECTED: {
1512 if (condition & LINK_ERR_CONDS) {
1513 d_printf ("Disconnect on err: %d\n", cnx->priv->fd);
1514 link_connection_state_changed_T_R (cnx, LINK_DISCONNECTED);
1515 }
1516 break;
1517 }
1518 default:
1519 break;
1520 }
1521 }
1522
1523 link_connection_unref_unlock (cnx);
1524
1525 return TRUE;
1526 }
1527
1528 LinkConnectionStatus
link_connection_get_status(LinkConnection * cnx)1529 link_connection_get_status (LinkConnection *cnx)
1530 {
1531 LinkConnectionStatus status;
1532
1533 CNX_LOCK (cnx);
1534 status = cnx->status;
1535 CNX_UNLOCK (cnx);
1536
1537 d_printf ("Get status on %p = %d\n", cnx, status);
1538
1539 return status;
1540 }
1541
1542 void
link_connection_exec_disconnect(LinkCommandDisconnect * cmd,gboolean immediate)1543 link_connection_exec_disconnect (LinkCommandDisconnect *cmd, gboolean immediate)
1544 {
1545 d_printf ("Exec defered disconnect on %p\n", cmd->cnx);
1546
1547 link_connection_state_changed (cmd->cnx, LINK_DISCONNECTED);
1548
1549 link_connection_unref (cmd->cnx);
1550 g_free (cmd);
1551 }
1552
1553 void
link_connection_disconnect(LinkConnection * cnx)1554 link_connection_disconnect (LinkConnection *cnx)
1555 {
1556 LinkCommandDisconnect *cmd;
1557
1558 cmd = g_new (LinkCommandDisconnect, 1);
1559 cmd->cmd.type = LINK_COMMAND_DISCONNECT;
1560 cmd->cnx = link_connection_ref (cnx);
1561
1562 link_exec_command ((LinkCommand *) cmd);
1563 }
1564
1565 LinkConnectionStatus
link_connection_wait_connected(LinkConnection * cnx)1566 link_connection_wait_connected (LinkConnection *cnx)
1567 {
1568 LinkConnectionStatus status;
1569
1570 CNX_LOCK (cnx);
1571
1572 status = link_connection_wait_connected_T (cnx);
1573
1574 CNX_UNLOCK (cnx);
1575
1576 return status;
1577 }
1578
1579 void
link_connections_move_io_T(gboolean to_io_thread)1580 link_connections_move_io_T (gboolean to_io_thread)
1581 {
1582 GList *l;
1583 for (l = cnx_list; l; l = l->next) {
1584 LinkConnection *cnx = l->data;
1585 link_watch_move_io (cnx->priv->tag, to_io_thread);
1586 }
1587 }
1588
1589 void
link_connection_add_broken_cb(LinkConnection * cnx,LinkBrokenCallback fn,gpointer user_data)1590 link_connection_add_broken_cb (LinkConnection *cnx,
1591 LinkBrokenCallback fn,
1592 gpointer user_data)
1593 {
1594 BrokenCallback *bc = g_new0 (BrokenCallback, 1);
1595
1596 g_return_if_fail (fn != NULL);
1597
1598 bc->fn = fn;
1599 bc->user_data = user_data;
1600
1601 cnx->idle_broken_callbacks = g_slist_prepend (cnx->idle_broken_callbacks, bc);
1602 }
1603
1604 static gboolean
broken_callback_match(BrokenCallback * bc,LinkBrokenCallback fn,gpointer user_data)1605 broken_callback_match (BrokenCallback *bc,
1606 LinkBrokenCallback fn,
1607 gpointer user_data)
1608 {
1609 return ( (!fn || bc->fn == fn) &&
1610 (!user_data || bc->user_data == user_data) );
1611 }
1612
1613 void
link_connection_remove_broken_cb(LinkConnection * cnx,LinkBrokenCallback opt_fn,gpointer opt_user_data)1614 link_connection_remove_broken_cb (LinkConnection *cnx,
1615 LinkBrokenCallback opt_fn,
1616 gpointer opt_user_data)
1617 {
1618 GSList *l, *next;
1619
1620 CNX_LOCK (cnx);
1621
1622 for (l = cnx->idle_broken_callbacks; l; l = next) {
1623 next = l->next;
1624 if (broken_callback_match (l->data, opt_fn, opt_user_data)) {
1625 g_free (l->data);
1626 cnx->idle_broken_callbacks =
1627 g_slist_delete_link (cnx->idle_broken_callbacks,
1628 l);
1629 }
1630 }
1631
1632 CNX_UNLOCK (cnx);
1633 }
1634
1635 void
link_connections_close(void)1636 link_connections_close (void)
1637 {
1638 GList *cnx, *l;
1639
1640 if (!link_in_io_thread ())
1641 return;
1642
1643 CNX_LIST_LOCK();
1644 cnx = cnx_list;
1645 cnx_list = NULL;
1646 CNX_LIST_UNLOCK();
1647
1648 if (!cnx)
1649 return;
1650
1651 /* FIXME: Need to shutdown linc connections ... */
1652
1653 for (l = cnx; l; l = l->next)
1654 g_object_run_dispose (l->data);
1655
1656 g_list_free (cnx);
1657 }
1658
1659 void
link_set_timeout(guint msec)1660 link_set_timeout (guint msec)
1661 {
1662 _link_timeout = msec;
1663 }
1664
1665