1 /*
2  * linc-connection.c: This file is part of the linc library.
3  *
4  * Authors:
5  *    Elliot Lee     (sopwith@redhat.com)
6  *    Michael Meeks  (michael@ximian.com)
7  *    Mark McLouglin (mark@skynet.ie) & others
8  *
9  * Copyright 2001, Red Hat, Inc., Ximian, Inc.,
10  *                 Sun Microsystems, Inc.
11  */
12 #include <config.h>
13 #include <stdarg.h>
14 #include <fcntl.h>
15 #include <errno.h>
16 #include <string.h>
17 #include <stdio.h>
18 #include <ctype.h>
19 
20 #ifdef LINK_SSL_SUPPORT
21 #  include <openssl/ssl.h>
22 #endif
23 
24 #include "linc-private.h"
25 #include "linc-compat.h"
26 #include <linc/linc-config.h>
27 #include <linc/linc-connection.h>
28 
29 static GObjectClass *parent_class = NULL;
30 static guint _link_timeout = 0;
31 
32 enum {
33 	BROKEN,
34 	BLOCKING,
35 	LAST_SIGNAL
36 };
37 static guint  signals [LAST_SIGNAL];
38 static GList *cnx_list = NULL;
39 
40 #define CNX_LOCK(cnx)    G_STMT_START { link_lock();   } G_STMT_END
41 #define CNX_UNLOCK(cnx)  G_STMT_START { link_unlock(); } G_STMT_END
42 #define CNX_LIST_LOCK()      CNX_LOCK(0);   /* for now */
43 #define CNX_LIST_UNLOCK()    CNX_UNLOCK(0); /* for now */
44 #define CNX_AND_LIST_LOCK(cnx)   CNX_LOCK(cnx);   /* for now */
45 #define CNX_AND_LIST_UNLOCK(cnx) CNX_UNLOCK(cnx); /* for now */
46 #define CNX_IS_LOCKED(cnx) link_is_locked()
47 
48 static gboolean link_connection_io_handler (GIOChannel  *gioc,
49 					    GIOCondition condition,
50 					    gpointer     data);
51 
52 static inline LinkConnection*
link_connection_ref_T(LinkConnection * cnx)53 link_connection_ref_T (LinkConnection *cnx)
54 {
55 	return LINK_CONNECTION (g_object_ref (G_OBJECT (cnx)));
56 }
57 
58 LinkConnection *
link_connection_ref(LinkConnection * cnx)59 link_connection_ref (LinkConnection *cnx)
60 {
61 	CNX_AND_LIST_LOCK (cnx);
62 	g_object_ref (G_OBJECT (cnx));
63 	CNX_AND_LIST_UNLOCK (cnx);
64 
65 	return cnx;
66 }
67 
68 /* Only call if we are _certain_ that we don't hold the last ref */
69 static void
link_connection_unref_T_(LinkConnection * cnx)70 link_connection_unref_T_ (LinkConnection *cnx)
71 {
72 	g_assert (((GObject *)cnx)->ref_count > 1);
73 	g_object_unref (G_OBJECT (cnx));
74 }
75 
76 static void
link_connection_unref_unlock(LinkConnection * cnx)77 link_connection_unref_unlock (LinkConnection *cnx)
78 {
79 	gboolean tail_unref = FALSE;
80 
81 	if (((GObject *)cnx)->ref_count > 1)
82 		g_object_unref (G_OBJECT (cnx));
83 
84 	else {
85 		cnx_list = g_list_remove (cnx_list, cnx);
86 		tail_unref = TRUE;
87 	}
88 
89 	CNX_AND_LIST_UNLOCK (cnx);
90 
91 	if (tail_unref) {
92 		LinkCommandCnxUnref cmd[1];
93 
94 		cmd->cmd.cmd.type = LINK_COMMAND_CNX_UNREF;
95 		cmd->cmd.complete = FALSE;
96 		cmd->cnx = cnx;
97 		link_exec_command ((LinkCommand *) cmd);
98 	}
99 }
100 
101 void
link_connection_exec_cnx_unref(LinkCommandCnxUnref * cmd,gboolean immediate)102 link_connection_exec_cnx_unref (LinkCommandCnxUnref *cmd, gboolean immediate)
103 {
104 	d_printf ("Exec defered unref on %p\n", cmd->cnx);
105 
106 	if (immediate) /* In I/O thread - with just 1 ref left */
107 		g_object_unref (G_OBJECT (cmd->cnx));
108 	else {
109 		CNX_AND_LIST_LOCK (cmd->cnx);
110 		link_connection_unref_unlock (cmd->cnx);
111 	}
112 }
113 
114 void
link_connection_unref(LinkConnection * cnx)115 link_connection_unref (LinkConnection *cnx)
116 {
117 	g_return_if_fail (cnx != NULL);
118 
119 	CNX_AND_LIST_LOCK (cnx);
120 
121 	link_connection_unref_unlock (cnx);
122 }
123 
124 static void
link_close_fd(LinkConnection * cnx)125 link_close_fd (LinkConnection *cnx)
126 {
127 	if (cnx->priv->fd >= 0) {
128 		d_printf ("link_close_fd: closing %d\n", cnx->priv->fd);
129 		LINK_CLOSE_SOCKET (cnx->priv->fd);
130 	}
131 	cnx->priv->fd = -1;
132 }
133 
134 typedef struct {
135 	LinkBrokenCallback fn;
136 	gpointer           user_data;
137 } BrokenCallback;
138 
139 static void
link_connection_emit_broken(LinkConnection * cnx,GSList * callbacks)140 link_connection_emit_broken (LinkConnection *cnx, GSList *callbacks)
141 {
142 	GSList *l;
143 
144 	for (l = callbacks; l; l = l->next) {
145 		BrokenCallback *bc = l->data;
146 		bc->fn (cnx, bc->user_data);
147 		g_free (bc);
148 	}
149 	g_slist_free (callbacks);
150 }
151 
152 /*
153  * Unfortunate to have a global list, but we need to know
154  * if this is being processed in the main thread & if so
155  * simply append to it.
156  */
157 static GSList *idle_broken_cnxs = NULL;
158 
159 static gboolean
link_connection_broken_idle(gpointer dummy)160 link_connection_broken_idle (gpointer dummy)
161 {
162 	GSList *callbacks;
163 	LinkConnection *cnx;
164 
165 	d_printf ("Connection broken idle ...\n");
166 
167 	do {
168 		link_lock();
169 		cnx = NULL;
170 		if (idle_broken_cnxs != NULL) {
171 			cnx = idle_broken_cnxs->data;
172 			idle_broken_cnxs = g_slist_delete_link (idle_broken_cnxs, idle_broken_cnxs);
173 		}
174 		if (cnx) {
175 			callbacks = cnx->idle_broken_callbacks;
176 			cnx->idle_broken_callbacks = NULL;
177 			cnx->inhibit_reconnect = FALSE;
178 			link_signal ();
179 		}
180 		link_unlock ();
181 
182 		if (cnx) {
183 			link_connection_emit_broken (cnx, callbacks);
184 			link_connection_unref (cnx);
185 		}
186 	} while (cnx != NULL);
187 
188 	return FALSE;
189 }
190 
191 static void
add_idle_broken_for_cnx_T(LinkConnection * cnx)192 add_idle_broken_for_cnx_T (LinkConnection *cnx)
193 {
194 	if (idle_broken_cnxs != NULL) {
195 		fprintf (stderr, "Deadlock potential - avoiding evil bug!\n");
196 		/*
197 		 * just append ourself & exit - we don't want to
198 		 * inhibit re-connection because of the horrendous
199 		 * deadlock possible, cf. g#534351#
200 		 */
201 		if (g_slist_find (idle_broken_cnxs, cnx) != NULL)
202 			return;
203 	} else {
204 		/* inhibit reconnect while we emit 'broken' */
205 		cnx->inhibit_reconnect = TRUE;
206 		g_idle_add (link_connection_broken_idle, NULL);
207 	}
208 	link_connection_ref_T (cnx);
209 	idle_broken_cnxs = g_slist_prepend (idle_broken_cnxs, cnx);
210 }
211 
212 static void
link_source_remove(LinkConnection * cnx)213 link_source_remove (LinkConnection *cnx)
214 {
215 	if (cnx->priv->tag) {
216 		LinkWatch *thewatch = cnx->priv->tag;
217 		cnx->priv->tag = NULL;
218 		link_io_remove_watch (thewatch);
219 		d_printf ("Removed watch on %d\n", cnx->priv->fd);
220 	}
221 }
222 
223 static void
link_source_add(LinkConnection * cnx,GIOCondition condition)224 link_source_add (LinkConnection *cnx,
225 		 GIOCondition    condition)
226 {
227 	g_assert (cnx->priv->tag == NULL);
228 
229 	cnx->priv->tag = link_io_add_watch_fd (
230 		cnx->priv->fd, condition,
231 		link_connection_io_handler, cnx);
232 
233 	d_printf ("Added watch on %d (0x%x)\n",
234 		 cnx->priv->fd, condition);
235 }
236 
237 typedef struct {
238 	guchar       *data;
239 
240 	struct iovec *vecs;
241 	int           nvecs;
242 	struct iovec  single_vec;
243 } QueuedWrite;
244 
245 static void
queued_write_free(QueuedWrite * qw)246 queued_write_free (QueuedWrite *qw)
247 {
248 	g_free (qw->data);
249 	g_free (qw);
250 }
251 
252 static void
queue_free(LinkConnection * cnx)253 queue_free (LinkConnection *cnx)
254 {
255 	GList *l;
256 
257 	for (l = cnx->priv->write_queue; l; l = l->next)
258 		queued_write_free (l->data);
259 
260 	g_list_free (cnx->priv->write_queue);
261 	cnx->priv->write_queue = NULL;
262 }
263 
264 static void
dispatch_callbacks_drop_lock(LinkConnection * cnx)265 dispatch_callbacks_drop_lock (LinkConnection *cnx)
266 {
267 	GSList *callbacks;
268 
269 	callbacks = cnx->idle_broken_callbacks;
270 	cnx->idle_broken_callbacks = NULL;
271 
272 	CNX_UNLOCK (cnx);
273 	link_connection_emit_broken (cnx, callbacks);
274 	CNX_LOCK (cnx);
275 }
276 
277 /*
278  * link_connection_class_state_changed:
279  * @cnx: a #LinkConnection
280  * @status: a #LinkConnectionStatus value.
281  *
282  * Set up linc's #GSources if the connection is in the #LINK_CONNECTED
283  * or #LINK_CONNECTING state.
284  *
285  * Remove the #GSources if the state has channged to #LINK_DISCONNECTED,
286  * close the socket and a gobject broken signal which may be caught by
287  * the application.
288  *
289  * Also perform SSL specific operations if the connection has move into
290  * the #LINK_CONNECTED state.
291  */
292 static void
link_connection_state_changed_T_R(LinkConnection * cnx,LinkConnectionStatus status)293 link_connection_state_changed_T_R (LinkConnection      *cnx,
294 				   LinkConnectionStatus status)
295 {
296 	gboolean changed;
297 	LinkConnectionClass *klass;
298 
299 	g_assert (CNX_IS_LOCKED (cnx));
300 
301 	d_printf ("State changing from '%s' to '%s' on fd %d\n",
302 		 STATE_NAME (cnx->status), STATE_NAME (status),
303 		 cnx->priv->fd);
304 
305 	changed = cnx->status != status;
306 
307 	cnx->status = status;
308 
309 	switch (status) {
310 	case LINK_CONNECTED:
311 #ifdef LINK_SSL_SUPPORT
312 		if (cnx->options & LINK_CONNECTION_SSL) {
313 			if (cnx->was_initiated)
314 				SSL_connect (cnx->priv->ssl);
315 			else
316 				SSL_accept (cnx->priv->ssl);
317 		}
318 #endif
319 		if (!cnx->priv->tag)
320 			link_source_add (cnx, LINK_ERR_CONDS | LINK_IN_CONDS);
321 		break;
322 
323 	case LINK_CONNECTING:
324 
325 		if (cnx->priv->tag) /* re-connecting */
326 			link_watch_set_condition (
327 				cnx->priv->tag,
328 				G_IO_OUT | LINK_ERR_CONDS);
329 		else
330 			link_source_add (cnx, G_IO_OUT | LINK_ERR_CONDS);
331 		break;
332 
333 	case LINK_DISCONNECTED:
334 	case LINK_TIMEOUT:
335 		link_source_remove (cnx);
336 		link_close_fd (cnx);
337 		queue_free (cnx);
338 		/* don't free pending queue - we could get re-connected */
339 		if (changed) {
340 
341 			if (!cnx->priv->was_disconnected) {
342 				d_printf ("Emitting the broken signal on %p\n", cnx);
343 				CNX_UNLOCK (cnx);
344 				g_signal_emit (cnx, signals [BROKEN], 0);
345 				CNX_LOCK (cnx);
346 			}
347 
348 			if (cnx->idle_broken_callbacks) {
349 				if (!link_thread_io ()) {
350 					d_printf ("Immediate broken callbacks at immediately\n");
351 
352 					dispatch_callbacks_drop_lock (cnx);
353 				} else {
354 					d_printf ("Queuing broken callbacks at idle\n");
355 					add_idle_broken_for_cnx_T (cnx);
356 				}
357 			}
358 		}
359 		break;
360 	}
361 
362 	klass = (LinkConnectionClass *)G_OBJECT_GET_CLASS (cnx);
363 
364 	if (klass->state_changed) {
365 		link_signal ();
366 		CNX_UNLOCK (cnx);
367 		klass->state_changed (cnx, status);
368 		CNX_LOCK (cnx);
369 	}
370 }
371 
372 static void
queue_signal_T_R(LinkConnection * cnx,glong delta)373 queue_signal_T_R (LinkConnection *cnx,
374 		  glong           delta)
375 {
376 	gulong old_size;
377 	gulong new_size;
378 
379 	d_printf ("Queue signal %ld bytes, delta %ld, max %ld\n",
380 		  cnx->priv->write_queue_bytes, delta,
381 		  cnx->priv->max_buffer_bytes);
382 
383 	old_size = cnx->priv->write_queue_bytes;
384 	cnx->priv->write_queue_bytes += delta;
385 	new_size = cnx->priv->write_queue_bytes;
386 
387 	if (cnx->options & LINK_CONNECTION_BLOCK_SIGNAL) {
388 		if (new_size == 0 ||
389 		    (old_size < (cnx->priv->max_buffer_bytes >> 1) &&
390 		     new_size >= (cnx->priv->max_buffer_bytes >> 1)) ||
391 		    new_size >= cnx->priv->max_buffer_bytes) {
392 			CNX_UNLOCK (cnx);
393 			g_signal_emit (cnx, signals [BLOCKING], 0, new_size);
394 			CNX_LOCK (cnx);
395 		}
396 	}
397 
398 	if (cnx->priv->max_buffer_bytes &&
399 	    cnx->priv->write_queue_bytes >= cnx->priv->max_buffer_bytes)
400 		link_connection_state_changed_T_R (cnx, LINK_DISCONNECTED);
401 }
402 
403 static gulong
calc_size(struct iovec * src_vecs,int nvecs)404 calc_size (struct iovec *src_vecs,
405 	   int           nvecs)
406 {
407 	int i;
408 	gulong total_size = 0;
409 
410 	for (i = 0; i < nvecs; i++)
411 		total_size += src_vecs [i].iov_len;
412 
413 	return total_size;
414 }
415 
416 static void
queue_flattened_T_R(LinkConnection * cnx,struct iovec * src_vecs,int nvecs,gboolean update_poll)417 queue_flattened_T_R (LinkConnection *cnx,
418 		     struct iovec   *src_vecs,
419 		     int             nvecs,
420 		     gboolean        update_poll)
421 {
422 	int     i;
423 	guchar *p;
424 	gulong  total_size;
425 	gboolean new_queue;
426 	QueuedWrite *qw = g_new (QueuedWrite, 1);
427 
428 	total_size = calc_size (src_vecs, nvecs);
429 
430 	p = g_malloc (total_size);
431 
432 	qw->data  = p;
433 	qw->vecs  = &qw->single_vec;
434 	qw->nvecs = 1;
435 
436 	qw->vecs->iov_base = p;
437 	qw->vecs->iov_len = total_size;
438 
439 	for (i = 0; i < nvecs; i++) {
440 		memcpy (p, src_vecs [i].iov_base, src_vecs [i].iov_len);
441 		p += src_vecs [i].iov_len;
442 	}
443 	g_assert (p == (qw->data + total_size));
444 
445 	d_printf ("Queueing write of %ld bytes on fd %d\n",
446 		  total_size, cnx->priv->fd);
447 
448 	new_queue = cnx->priv->write_queue == NULL;
449 	cnx->priv->write_queue = g_list_append (cnx->priv->write_queue, qw);
450 	queue_signal_T_R (cnx, total_size);
451 
452 	if (update_poll && new_queue) {
453 		LinkCommandSetCondition *cmd;
454 
455 		cmd = g_new (LinkCommandSetCondition, 1);
456 		cmd->cmd.type = LINK_COMMAND_SET_CONDITION;
457 		cmd->cnx = link_connection_ref_T (cnx);
458 		cmd->condition = (LINK_ERR_CONDS | LINK_IN_CONDS | G_IO_OUT);
459 		link_exec_command (&cmd->cmd);
460 	}
461 }
462 
463 static void
link_connection_from_fd_T(LinkConnection * cnx,int fd,const LinkProtocolInfo * proto,gchar * remote_host_info,gchar * remote_serv_info,gboolean was_initiated,LinkConnectionStatus status,LinkConnectionOptions options)464 link_connection_from_fd_T (LinkConnection         *cnx,
465 			   int                     fd,
466 			   const LinkProtocolInfo *proto,
467 			   gchar                  *remote_host_info,
468 			   gchar                  *remote_serv_info,
469 			   gboolean                was_initiated,
470 			   LinkConnectionStatus    status,
471 			   LinkConnectionOptions   options)
472 {
473 	cnx->was_initiated = was_initiated;
474 	cnx->is_auth       = (proto->flags & LINK_PROTOCOL_SECURE);
475 	cnx->proto         = proto;
476 	cnx->options       = options;
477 	cnx->priv->fd      = fd;
478 
479 	g_free (cnx->remote_host_info);
480 	cnx->remote_host_info = remote_host_info;
481 	g_free (cnx->remote_serv_info);
482 	cnx->remote_serv_info = remote_serv_info;
483 
484 	switch (cnx->proto->family) {
485 	case AF_INET:
486 #ifdef AF_INET6
487 	case AF_INET6:
488 #endif
489 		if (_link_timeout && !cnx->timeout_msec) /* this should'nt happen twice but I'm always paranoid... */
490 			cnx->timeout_msec = _link_timeout;
491 		break;
492 	default:
493 		break;
494 	}
495 
496 	d_printf ("Cnx from fd (%d) '%s', '%s', '%s'\n",
497 		 fd, proto->name,
498 		 remote_host_info ? remote_host_info : "<Null>",
499 		 remote_serv_info ? remote_serv_info : "<Null>");
500 
501 	if (proto->setup)
502 		proto->setup (fd, options);
503 
504 #ifdef LINK_SSL_SUPPORT
505 	if (options & LINK_CONNECTION_SSL) {
506 		cnx->priv->ssl = SSL_new (link_ssl_ctx);
507 		SSL_set_fd (cnx->priv->ssl, fd);
508 	}
509 #endif
510 	g_assert (CNX_IS_LOCKED (0));
511 	link_connection_state_changed_T_R (cnx, status);
512 
513 	if (!g_list_find (cnx_list, cnx))
514 		cnx_list = g_list_prepend (cnx_list, cnx);
515 }
516 
517 /*
518  * link_connection_from_fd:
519  * @cnx: a #LinkConnection.
520  * @fd: a connected/connecting file descriptor.
521  * @proto: a #LinkProtocolInfo.
522  * @remote_host_info: protocol dependant host information; gallocation swallowed
523  * @remote_serv_info: protocol dependant service information(e.g. port number). gallocation swallowed
524  * @was_initiated: #TRUE if the connection was initiated by us.
525  * @status: a #LinkConnectionStatus value.
526  * @options: combination of #LinkConnectionOptions.
527  *
528  * Fill in @cnx, call protocol specific initialisation methonds and then
529  * call link_connection_state_changed.
530  *
531  * Return Value: #TRUE if the function succeeds, #FALSE otherwise.
532  */
533 void
link_connection_from_fd(LinkConnection * cnx,int fd,const LinkProtocolInfo * proto,gchar * remote_host_info,gchar * remote_serv_info,gboolean was_initiated,LinkConnectionStatus status,LinkConnectionOptions options)534 link_connection_from_fd (LinkConnection         *cnx,
535 			 int                     fd,
536 			 const LinkProtocolInfo *proto,
537 			 gchar                  *remote_host_info,
538 			 gchar                  *remote_serv_info,
539 			 gboolean                was_initiated,
540 			 LinkConnectionStatus    status,
541 			 LinkConnectionOptions   options)
542 {
543 	CNX_LOCK (cnx);
544 
545 	link_connection_from_fd_T (cnx, fd, proto,
546 				   remote_serv_info, remote_serv_info,
547 				   was_initiated, status, options);
548 	CNX_UNLOCK (cnx);
549 }
550 
551 #ifndef G_OS_WIN32
552 static void
fix_permissions(const char * filename)553 fix_permissions (const char *filename)
554 {
555 	char *tmp_dir = g_strdup (filename);
556 	char *p;
557 	struct stat stat_buf;
558 
559 	if (!tmp_dir)
560 		return;
561 	p = strrchr (tmp_dir, '/');
562 	if (p) {
563 		*p = '\0';
564 		stat (tmp_dir, &stat_buf);
565 		chown (filename, stat_buf.st_uid, -1);
566 	}
567 }
568 #endif
569 
570 static gboolean
link_connection_do_initiate(LinkConnection * cnx,const char * proto_name,const char * host,const char * service,LinkConnectionOptions options)571 link_connection_do_initiate (LinkConnection        *cnx,
572 			     const char            *proto_name,
573 			     const char            *host,
574 			     const char            *service,
575 			     LinkConnectionOptions  options)
576 {
577 	const LinkProtocolInfo *proto;
578 	int                     rv;
579 	int                     fd;
580 	gboolean                retval = FALSE;
581 	struct sockaddr        *saddr;
582 	LinkSockLen		saddr_len;
583 
584 	proto = link_protocol_find (proto_name);
585 
586 	if (!proto)
587 		return FALSE;
588 
589 	saddr = link_protocol_get_sockaddr (
590 		proto, host, service, &saddr_len);
591 
592 	if (!saddr && (strcmp (proto_name, "IPv6") ==0)) {/* Falling back to IPv4 */
593 		proto = link_protocol_find ("IPv4");
594 
595 		saddr = link_protocol_get_sockaddr (
596 			proto, host, service, &saddr_len);
597 	}
598 
599 	if (!saddr)
600 		return FALSE;
601 
602 	fd = socket (proto->family, SOCK_STREAM,
603 		     proto->stream_proto_num);
604 #ifdef HAVE_WINSOCK2_H
605 	if (fd == INVALID_SOCKET) {
606 		fd = -1;
607 		link_map_winsock_error_to_errno ();
608 	}
609 #endif
610 
611 	if (fd < 0) {
612 		d_printf ("socket() failed: %s\n", link_strerror (errno));
613 		goto out;
614 	}
615 
616 	if (options & LINK_CONNECTION_NONBLOCKING) {
617 #ifdef HAVE_WINSOCK2_H
618 		u_long yes = 1;
619 		if (ioctlsocket (fd, FIONBIO, &yes) == SOCKET_ERROR) {
620 			link_map_winsock_error_to_errno ();
621 			d_printf ("ioctlsocket(FIONBIO) failed: %s\n",
622 				  link_strerror (errno));
623 			goto out;
624 		}
625 #else
626 		if (fcntl (fd, F_SETFL, O_NONBLOCK) < 0)
627 			goto out;
628 #endif
629 	}
630 
631 #if defined (F_SETFD) && defined (FD_CLOEXEC)
632 	if (fcntl (fd, F_SETFD, FD_CLOEXEC) < 0)
633 		goto out;
634 #endif
635 #ifdef HAVE_WINSOCK2_H
636 	{
637 		SOCKET newfd;
638 
639 		if (!DuplicateHandle (GetCurrentProcess (), (HANDLE) fd,
640 				      GetCurrentProcess (), (LPHANDLE) &newfd,
641 				      0, FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE)) {
642 			d_printf ("DuplicateHandle failed: %s\n", link_strerror (WSAGetLastError ()));
643 			return FALSE;
644 		}
645 		fd = newfd;
646 	}
647 #endif
648 
649 #ifndef G_OS_WIN32
650 	if (!strcmp (proto_name, "UNIX") && getuid() == 0) {
651 		fix_permissions (service);
652 	}
653 #endif
654 
655 	LINK_TEMP_FAILURE_RETRY_SOCKET (connect (fd, saddr, saddr_len), rv);
656 #ifdef HAVE_WINSOCK2_H
657 	if (rv == SOCKET_ERROR) {
658 		if ((options & LINK_CONNECTION_NONBLOCKING) &&
659 		    WSAGetLastError () == WSAEWOULDBLOCK) {
660 			/* connect() for nonblocking sockets always
661 			 * fails with WSAEWOULDBLOCK. We have to
662 			 * select() to wait for actual status.
663 			 */
664 			fd_set write_fds, except_fds;
665 
666 			FD_ZERO (&write_fds);
667 			FD_SET (fd, &write_fds);
668 
669 			FD_ZERO (&except_fds);
670 			FD_SET (fd, &except_fds);
671 
672 			rv  = select (1, NULL, &write_fds, &except_fds, NULL);
673 			if (rv == SOCKET_ERROR) {
674 				rv = -1;
675 				link_map_winsock_error_to_errno ();
676 			} else if (FD_ISSET (fd, &write_fds)) {
677 				rv = 0;
678 			} else if (FD_ISSET (fd, &except_fds)) {
679 				rv = -1;
680 				errno = WSAECONNREFUSED;
681 			}
682 		} else {
683 			rv = -1;
684 			link_map_winsock_error_to_errno ();
685 		}
686 	}
687 #endif
688 	if (rv && errno != EINPROGRESS)
689 		goto out;
690 
691 	d_printf ("initiate 'connect' on new fd %d\n", fd);
692 
693 	g_assert (CNX_IS_LOCKED (0));
694 	link_connection_from_fd_T
695 		(cnx, fd, proto,
696 		 g_strdup (host), g_strdup (service),
697 		 TRUE, rv ? LINK_CONNECTING : LINK_CONNECTED,
698 		 options);
699 	retval = TRUE;
700 
701  out:
702 	if (!retval && fd >= 0) {
703 		d_printf ("initiation failed: %s\n", link_strerror (errno));
704 		d_printf ("closing %d\n", fd);
705 		LINK_CLOSE_SOCKET (fd);
706 	}
707 
708 	g_free (saddr);
709 
710 	return retval;
711 }
712 
713 static LinkConnectionStatus
link_connection_wait_connected_T(LinkConnection * cnx)714 link_connection_wait_connected_T (LinkConnection *cnx)
715 {
716 	while (cnx && cnx->status == LINK_CONNECTING)
717 		link_wait ();
718 
719 	return cnx ? cnx->status : LINK_DISCONNECTED;
720 }
721 
722 LinkConnectionStatus
link_connection_try_reconnect(LinkConnection * cnx)723 link_connection_try_reconnect (LinkConnection *cnx)
724 {
725 	LinkConnectionStatus status;
726 
727 	g_return_val_if_fail (LINK_IS_CONNECTION (cnx), LINK_DISCONNECTED);
728 
729 	CNX_LOCK (cnx);
730 
731 	d_printf ("Try for reconnection on %p: %d\n",
732 		  cnx, cnx->inhibit_reconnect);
733 
734 	while (cnx->inhibit_reconnect) {
735 		if (g_main_context_acquire (NULL)) {
736 			d_printf ("Dispatch callbacks in 'main' (mainloop owning) thread\n");
737 			cnx->inhibit_reconnect = FALSE;
738 			dispatch_callbacks_drop_lock (cnx);
739 			g_main_context_release (NULL);
740 		} else
741 			link_wait ();
742 	}
743 
744 	switch (cnx->status) {
745 	case LINK_DISCONNECTED :
746 	case LINK_TIMEOUT :
747 		link_connection_do_initiate
748 			(cnx, cnx->proto->name, cnx->remote_host_info,
749 			 cnx->remote_serv_info, cnx->options);
750 		break;
751 	default :
752 		g_warning ("trying to re-connect connected cnx.");
753 		break;
754 	}
755 
756 	cnx->priv->was_disconnected = TRUE;
757 	status = link_connection_wait_connected_T (cnx);
758 	cnx->priv->was_disconnected = FALSE;
759 
760 	CNX_UNLOCK (cnx);
761 
762 	return status;
763 }
764 
765 /**
766  * link_connection_initiate_list:
767  * @derived_type: a #LinkConnection derived type
768  * @proto_name: the name of the protocol to use.
769  * @host: protocol dependant host information.
770  * @service: protocol dependant service information(e.g. port number).
771  * @options: combination of #LinkConnectionOptions.
772  * @opt_construct_fn: optional constructor fn for new cnx's or NULL
773  * @user_data: optional user data for constructor
774  *
775  * Looks up a connection in our cnx. list to see if we already
776  * have a matching connection; if so returns it, otherwise
777  * constructs a new cnx. and retursn that
778  *
779  * Return value: an incremented cnx ref.
780  **/
781 LinkConnection *
link_connection_initiate(GType derived_type,const char * proto_name,const char * remote_host_info,const char * remote_serv_info,LinkConnectionOptions options,const char * first_property,...)782 link_connection_initiate (GType                 derived_type,
783 			  const char           *proto_name,
784 			  const char           *remote_host_info,
785 			  const char           *remote_serv_info,
786 			  LinkConnectionOptions options,
787 			  const char           *first_property,
788 			  ...)
789 {
790 	va_list args;
791 	GList *l;
792 	gboolean initiated = TRUE;
793 	LinkConnection *cnx = NULL;
794 	const LinkProtocolInfo *proto;
795 
796 	va_start (args, first_property);
797 
798 	proto = link_protocol_find (proto_name);
799 
800 	CNX_LIST_LOCK();
801 
802 	/* FIXME: hash this if it's slow */
803 	for (l = cnx_list; l; l = l->next) {
804 		cnx = l->data;
805 
806 		if (cnx->was_initiated && cnx->proto == proto &&
807 		    cnx->status != LINK_DISCONNECTED &&
808 		    ((cnx->options & LINK_CONNECTION_SSL) == (options & LINK_CONNECTION_SSL)) &&
809 		    !strcmp (remote_host_info, cnx->remote_host_info) &&
810 		    !strcmp (remote_serv_info, cnx->remote_serv_info)) {
811 			cnx = link_connection_ref_T (cnx);
812 			break;
813 		}
814 	}
815 
816 	cnx = l ? l->data : NULL;
817 
818 	if (!cnx) {
819 		cnx = LINK_CONNECTION
820 			(g_object_new_valist (derived_type, first_property, args));
821 
822 		initiated = link_connection_do_initiate
823 			(cnx, proto_name, remote_host_info,
824 			 remote_serv_info, options);
825 	}
826 
827 	CNX_LIST_UNLOCK();
828 
829 	if (!initiated) {
830 		link_connection_unref (cnx);
831 		cnx = NULL;
832 	}
833 
834 	va_end (args);
835 
836 	return cnx;
837 }
838 
839 /*
840  * link_connection_state_changed:
841  * @cnx: a #LinkConnection.
842  * @status: a #LinkConnectionStatus.
843  *
844  * A wrapper for the #LinkConnectionClass's state change method.
845  */
846 void
link_connection_state_changed(LinkConnection * cnx,LinkConnectionStatus status)847 link_connection_state_changed (LinkConnection      *cnx,
848 			       LinkConnectionStatus status)
849 {
850 	CNX_LOCK (cnx);
851 	link_connection_state_changed_T_R (cnx, status);
852 	CNX_UNLOCK (cnx);
853 }
854 
855 /**
856  * link_connection_read:
857  * @cnx: the connection to write to
858  * @buf: a pointer to the start of an array of bytes to read data into
859  * @len: the length of the array in bytes to read ingo
860  * @block_for_full_read: whether to block for a full read
861  *
862  * Warning, block_for_full_read is of limited usefullness.
863  *
864  * Return value: number of bytes written on success; negative on error.
865  **/
866 glong
link_connection_read(LinkConnection * cnx,guchar * buf,int len,gboolean block_for_full_read)867 link_connection_read (LinkConnection *cnx,
868 		      guchar         *buf,
869 		      int             len,
870 		      gboolean        block_for_full_read)
871 {
872 	int bytes_read = 0;
873 
874 	d_printf ("Read up to %d bytes from fd %d\n", len, cnx->priv->fd);
875 
876 	if (!len)
877 		return 0;
878 
879 	CNX_LOCK (cnx);
880 
881 	if (cnx->status != LINK_CONNECTED)
882 		goto fatal_error;
883 
884 	do {
885 		int n;
886 
887 #ifdef LINK_SSL_SUPPORT
888 		if (cnx->options & LINK_CONNECTION_SSL)
889 			n = SSL_read (cnx->priv->ssl, buf, len);
890 		else
891 #endif
892 #ifdef HAVE_WINSOCK2_H
893 			if ((n = recv (cnx->priv->fd, buf, len, 0)) == SOCKET_ERROR) {
894 				n = -1;
895 				link_map_winsock_error_to_errno ();
896 				d_printf ("recv failed: %s\n",
897 					  link_strerror (errno));
898 			}
899 #else
900 			LINK_TEMP_FAILURE_RETRY_SYSCALL (read (cnx->priv->fd,
901 							       buf,
902 							       len), n);
903 #endif
904 		g_assert (n <= len);
905 
906 		if (n < 0) {
907 #ifdef LINK_SSL_SUPPORT
908 			if (cnx->options & LINK_CONNECTION_SSL) {
909 				gulong rv;
910 
911 				rv = SSL_get_error (cnx->priv->ssl, n);
912 
913 				if ((rv == SSL_ERROR_WANT_READ ||
914 				     rv == SSL_ERROR_WANT_WRITE) &&
915 				    (cnx->options & LINK_CONNECTION_NONBLOCKING))
916 					goto out;
917 				else
918 					goto fatal_error;
919 			} else
920 #endif
921 			{
922 				if (errno == EINTR)
923 					continue;
924 
925 				else if (errno == EAGAIN &&
926 					 (cnx->options & LINK_CONNECTION_NONBLOCKING))
927 					goto out;
928 
929 				else if (errno == EBADF) {
930 					g_warning ("Serious fd usage error %d", cnx->priv->fd);
931 					goto fatal_error;
932 
933 				} else
934 					goto fatal_error;
935 			}
936 
937 		} else if (n == 0) {
938 			d_printf ("we got EOF on fd %d\n", cnx->priv->fd);
939 			bytes_read = LINK_IO_FATAL_ERROR;
940 			goto out;
941 		} else {
942 			buf += n;
943 			len -= n;
944 			bytes_read += n;
945 #ifdef CONNECTION_DEBUG
946 			cnx->priv->total_read_bytes += n;
947 #endif
948 		}
949 	} while (len > 0 && block_for_full_read);
950 
951 #ifdef CONNECTION_DEBUG
952 	d_printf ("we read %d bytes (total %"G_GUINT64_FORMAT")\n",
953 		  bytes_read, cnx->priv->total_read_bytes);
954 #endif
955 
956  out:
957 	CNX_UNLOCK (cnx);
958 	return bytes_read;
959 
960  fatal_error:
961 	CNX_UNLOCK (cnx);
962 	return LINK_IO_FATAL_ERROR;
963 }
964 
965 /* Determine the maximum size of the iovec vector */
966 
967 #if defined (MAXIOV) /* HPUX */
968 # define LINK_IOV_MAX (MAXIOV)
969 #elif defined (IOV_MAX) /* AIX */
970 # define LINK_IOV_MAX (IOV_MAX)
971 #elif defined (_SC_IOV_MAX) /* SGI */
972 # define LINK_IOV_MAX_INIT (sysconf (_SC_IOV_MAX))
973 #elif defined (__APPLE__)
974 /* Even though the write(2) man page mentions it, UIO_MAXIOV is only
975  * available if KERNEL is defined on MacOS X 10.1
976  */
977 #  define LINK_IOV_MAX 1024
978 #elif defined (UIO_MAXIOV) /* Glibc */
979 # define LINK_IOV_MAX (UIO_MAXIOV)
980 #else /* Safe Guess */
981 # define LINK_IOV_MAX 16
982 #endif
983 
984 /* If the value requires initialization, define the function here */
985 #if defined (LINK_IOV_MAX_INIT)
986 # define LINK_IOV_MAX link_iov_max
987   static guint link_iov_max = 0;
988   static inline void
link_iov_max_init()989   link_iov_max_init ()
990   {
991     if (link_iov_max == 0)
992       {
993         gint max;
994         G_LOCK_DEFINE_STATIC (link_iov_max);
995         G_LOCK (link_iov_max);
996         if (link_iov_max == 0)
997           {
998             max = LINK_IOV_MAX_INIT;
999             if (max <= 0)
1000               max = 16;
1001             link_iov_max = max;
1002           }
1003         G_UNLOCK (link_iov_max);
1004       }
1005   }
1006 #else
1007 # define link_iov_max_init()
1008 #endif
1009 
1010 static glong
write_data_T(LinkConnection * cnx,QueuedWrite * qw)1011 write_data_T (LinkConnection *cnx, QueuedWrite *qw)
1012 {
1013 	glong bytes_written = 0;
1014 
1015 	g_return_val_if_fail (cnx->status == LINK_CONNECTED,
1016 			      LINK_IO_FATAL_ERROR);
1017 
1018 	link_iov_max_init ();
1019 
1020 	while ((qw->nvecs > 0) && (qw->vecs->iov_len > 0)) {
1021 		int n;
1022 
1023 		d_printf ("write_data %ld bytes to fd %d - ",
1024 			  calc_size (qw->vecs, qw->nvecs), cnx->priv->fd);
1025 
1026 #ifdef LINK_SSL_SUPPORT
1027 		if (cnx->options & LINK_CONNECTION_SSL)
1028 			n = SSL_write (cnx->priv->ssl, qw->vecs->iov_base,
1029 				       qw->vecs->iov_len);
1030 		else
1031 #endif
1032 #ifdef HAVE_WINSOCK2_H
1033 			{
1034 				if (WSASend (cnx->priv->fd, qw->vecs,
1035 					     MIN (qw->nvecs, LINK_IOV_MAX),
1036 					     (LPDWORD) &n, 0, NULL, NULL) == SOCKET_ERROR) {
1037 					if (WSAGetLastError () == WSAEWOULDBLOCK)
1038 						link_win32_watch_set_write_wouldblock (cnx->priv->tag, TRUE);
1039 					n = -1;
1040 					link_map_winsock_error_to_errno ();
1041 					d_printf ("WSASend failed: %s\n",
1042 						  link_strerror (errno));
1043 				} else {
1044 					link_win32_watch_set_write_wouldblock (cnx->priv->tag, FALSE);
1045 				}
1046 			}
1047 #else
1048 			LINK_TEMP_FAILURE_RETRY_SOCKET (writev (cnx->priv->fd,
1049 								qw->vecs,
1050 								MIN (qw->nvecs, LINK_IOV_MAX)), n);
1051 #endif
1052 #ifdef CONNECTION_DEBUG
1053 		d_printf ("wrote %d bytes (total %"G_GUINT64_FORMAT")\n",
1054 			  n,
1055 			  (cnx->priv->total_written_bytes += ((n > 0) ? n : 0),
1056 			   cnx->priv->total_written_bytes));
1057 #endif
1058 		if (n < 0) {
1059 #ifdef LINK_SSL_SUPPORT
1060 			if (cnx->options & LINK_CONNECTION_SSL) {
1061 				gulong rv;
1062 
1063 				rv = SSL_get_error (cnx->priv->ssl, n);
1064 
1065 				if ((rv == SSL_ERROR_WANT_READ ||
1066 				     rv == SSL_ERROR_WANT_WRITE) &&
1067 				    cnx->options & LINK_CONNECTION_NONBLOCKING)
1068 					return LINK_IO_QUEUED_DATA;
1069 				else
1070 					return LINK_IO_FATAL_ERROR;
1071 			} else
1072 #endif
1073 			{
1074 				if (errno == EINTR)
1075 					continue;
1076 
1077 				else if (errno == EAGAIN &&
1078 					 (cnx->options & LINK_CONNECTION_NONBLOCKING))
1079 					return LINK_IO_QUEUED_DATA;
1080 
1081 				else if (errno == EBADF)
1082 					g_warning ("Serious fd usage error %d", cnx->priv->fd);
1083 
1084 				return LINK_IO_FATAL_ERROR; /* Unhandlable error */
1085 			}
1086 
1087 		} else if (n == 0) /* CHECK: is this really an error condition */
1088 			return LINK_IO_FATAL_ERROR;
1089 
1090 		else {
1091 			bytes_written += n;
1092 
1093 			while (qw->nvecs > 0 && n >= qw->vecs->iov_len) {
1094 				n -= qw->vecs->iov_len;
1095 				qw->nvecs--;
1096 				qw->vecs++;
1097 			}
1098 
1099 			if (n) {
1100 				qw->vecs->iov_len  -= n;
1101 				qw->vecs->iov_base = (guchar *)qw->vecs->iov_base + n;
1102 			}
1103 		}
1104 	}
1105 
1106 	return bytes_written;
1107 }
1108 
1109 static gboolean
link_connection_should_block(LinkConnection * cnx,const LinkWriteOpts * opt_write_opts)1110 link_connection_should_block (LinkConnection      *cnx,
1111 			      const LinkWriteOpts *opt_write_opts)
1112 {
1113 	if (!opt_write_opts)
1114 		return TRUE;
1115 
1116 	if (opt_write_opts->block_on_write)
1117 		return TRUE;
1118 
1119 	return FALSE;
1120 }
1121 
1122 /* Always called in the I/O thread */
1123 static void
link_connection_flush_write_queue_T_R(LinkConnection * cnx)1124 link_connection_flush_write_queue_T_R (LinkConnection *cnx)
1125 {
1126 	gboolean done_writes = TRUE;
1127 
1128 	if (cnx->priv->write_queue) {
1129 		glong        status;
1130 		QueuedWrite *qw = cnx->priv->write_queue->data;
1131 
1132 		status = write_data_T (cnx, qw);
1133 
1134 		d_printf ("Wrote queue %ld on fd %d\n", status, cnx->priv->fd);
1135 
1136 		if (status >= LINK_IO_OK) {
1137 			cnx->priv->write_queue = g_list_delete_link
1138 				(cnx->priv->write_queue, cnx->priv->write_queue);
1139 			queued_write_free (qw);
1140 
1141 			queue_signal_T_R (cnx, -status);
1142 
1143 			done_writes = (cnx->priv->write_queue == NULL);
1144 
1145 		} else {
1146 			if (status == LINK_IO_FATAL_ERROR) {
1147 				d_printf ("Fatal error on queued write\n");
1148 				link_connection_state_changed_T_R (cnx, LINK_DISCONNECTED);
1149 
1150 			} else {
1151 				d_printf ("Write blocked\n");
1152 				done_writes = FALSE;
1153 			}
1154 		}
1155 	}
1156 
1157 	d_printf ("Blocked write queue %s\n", done_writes ?
1158 		  "flushed & empty" : "still active");
1159 
1160 	if (done_writes) /* drop G_IO_OUT */
1161 		link_watch_set_condition
1162 			(cnx->priv->tag,
1163 			 LINK_ERR_CONDS | LINK_IN_CONDS);
1164 	else
1165 		link_watch_set_condition
1166 			(cnx->priv->tag,
1167 			 LINK_ERR_CONDS | LINK_IN_CONDS | G_IO_OUT);
1168 }
1169 
1170 void
link_connection_exec_set_condition(LinkCommandSetCondition * cmd,gboolean immediate)1171 link_connection_exec_set_condition (LinkCommandSetCondition *cmd, gboolean immediate)
1172 {
1173 	d_printf ("Exec defered set condition on %p -> 0x%x\n",
1174 		  cmd->cnx, cmd->condition);
1175 
1176 	if (!immediate)
1177 		CNX_LOCK (cmd->cnx);
1178 
1179 	link_watch_set_condition (cmd->cnx->priv->tag, cmd->condition);
1180 
1181 	if (!immediate)
1182 		link_connection_unref_unlock (cmd->cnx);
1183 
1184 	else /* special */
1185 		link_connection_unref_T_ (cmd->cnx);
1186 
1187 	g_free (cmd);
1188 }
1189 
1190 /**
1191  * link_connection_writev:
1192  * @cnx: the connection to write to
1193  * @vecs: a structure of iovecs to write - this is altered.
1194  * @nvecs: the number of populated iovecs
1195  * @opt_write_opts: optional write options, or NULL
1196  *
1197  * This routine writes data to the abstract connection.
1198  * FIXME: it allows re-enterancy via link_connection_iterate
1199  *        in certain cases.
1200  * FIXME: on this basis, the connection can die underneath
1201  *        our feet.
1202  *
1203  * Return value: 0 on success, non 0 on error.
1204  **/
1205 LinkIOStatus
link_connection_writev(LinkConnection * cnx,struct iovec * vecs,int nvecs,const LinkWriteOpts * opt_write_opts)1206 link_connection_writev (LinkConnection       *cnx,
1207 			struct iovec         *vecs,
1208 			int                   nvecs,
1209 			const LinkWriteOpts  *opt_write_opts)
1210 {
1211 	QueuedWrite qw;
1212 	int         status;
1213 
1214 	CNX_LOCK (cnx);
1215 	link_connection_ref_T (cnx);
1216 
1217 	if (link_thread_safe ()) {
1218 		d_printf ("Thread safe writev\n");
1219 		if (cnx->status == LINK_CONNECTING) {
1220 			queue_flattened_T_R (cnx, vecs, nvecs, TRUE);
1221 			link_connection_unref_unlock (cnx);
1222 			return LINK_IO_QUEUED_DATA;
1223 		}
1224 	} else if (cnx->options & LINK_CONNECTION_NONBLOCKING)
1225 		link_connection_wait_connected (cnx);
1226 
1227 	if (cnx->status == LINK_DISCONNECTED) {
1228 		link_connection_unref_unlock (cnx);
1229 		return LINK_IO_FATAL_ERROR;
1230 	}
1231 
1232 	if (cnx->priv->write_queue) {
1233 		/* FIXME: we should really retry the write here, but we'll
1234 		 * get a POLLOUT for this lot at some stage anyway */
1235 		queue_flattened_T_R (cnx, vecs, nvecs, FALSE);
1236 		link_connection_unref_unlock (cnx);
1237 		return LINK_IO_QUEUED_DATA;
1238 	}
1239 
1240 	qw.vecs  = vecs;
1241 	qw.nvecs = nvecs;
1242 
1243  continue_write:
1244 	status = write_data_T (cnx, &qw);
1245 
1246 	if (status == LINK_IO_QUEUED_DATA) {
1247 		if (link_thread_safe ()) {
1248 			queue_flattened_T_R (cnx, qw.vecs, qw.nvecs, TRUE);
1249 			link_connection_unref_unlock (cnx);
1250 			return LINK_IO_QUEUED_DATA;
1251 		}
1252 
1253 		/* Queue data & listen for buffer space */
1254 		link_watch_set_condition
1255 			(cnx->priv->tag,
1256 			 LINK_ERR_CONDS | LINK_IN_CONDS | G_IO_OUT);
1257 
1258 		if (!link_connection_should_block (cnx, opt_write_opts)) {
1259 			queue_flattened_T_R (cnx, qw.vecs, qw.nvecs, FALSE);
1260 			link_connection_unref_unlock (cnx);
1261 			return LINK_IO_QUEUED_DATA;
1262 
1263 		} else {
1264 			link_main_iteration (TRUE);
1265 			goto continue_write;
1266 		}
1267 
1268 	} else if (status >= LINK_IO_OK)
1269 		status = LINK_IO_OK;
1270 
1271 	link_connection_unref_unlock (cnx);
1272 
1273 	return status;
1274 }
1275 
1276 /**
1277  * link_connection_write:
1278  * @cnx: the connection to write to
1279  * @buf: a pointer to the start of an array of bytes
1280  * @len: the length of the array in bytes
1281  * @opt_write_opts: optional write options, or NULL
1282  *
1283  * Writes a contiguous block of data to the abstract connection.
1284  *
1285  * FIXME: it allows re-enterancy via link_connection_iterate
1286  *        in certain cases.
1287  * FIXME: on this basis, the connection can die underneath
1288  *        our feet eg. between the main_iteration and the
1289  *        g_return_if_fail.
1290  *
1291  * Return value: 0 on success, non 0 on error.
1292  **/
1293 LinkIOStatus
link_connection_write(LinkConnection * cnx,const guchar * buf,gulong len,const LinkWriteOpts * opt_write_opts)1294 link_connection_write (LinkConnection       *cnx,
1295 		       const guchar         *buf,
1296 		       gulong                len,
1297 		       const LinkWriteOpts  *opt_write_opts)
1298 {
1299 	struct iovec vec;
1300 
1301 	vec.iov_base = (guchar *) buf;
1302 	vec.iov_len  = len;
1303 
1304 	return link_connection_writev (cnx, &vec, 1, opt_write_opts);
1305 }
1306 
1307 static void
link_connection_dispose(GObject * obj)1308 link_connection_dispose (GObject *obj)
1309 {
1310 	LinkConnection *cnx = (LinkConnection *)obj;
1311 
1312 	d_printf ("dispose connection %p\n", obj);
1313 
1314 	link_source_remove (cnx);
1315 	queue_free (cnx);
1316 
1317 	parent_class->dispose (obj);
1318 }
1319 
1320 static void
link_connection_finalize(GObject * obj)1321 link_connection_finalize (GObject *obj)
1322 {
1323 	GSList *l;
1324 	LinkConnection *cnx = (LinkConnection *)obj;
1325 
1326 	link_close_fd (cnx);
1327 
1328 	for (l = cnx->idle_broken_callbacks; l; l = l->next)
1329 		g_free (l->data);
1330 	g_slist_free (cnx->idle_broken_callbacks);
1331 
1332 	g_free (cnx->remote_host_info);
1333 	g_free (cnx->remote_serv_info);
1334 
1335 	g_free (cnx->priv);
1336 
1337 	if (cnx->timeout_mutex)
1338 		g_mutex_free (cnx->timeout_mutex);
1339 	if (cnx->timeout_source_id)
1340 		link_io_thread_remove_timeout (cnx->timeout_source_id);
1341 
1342 	parent_class->finalize (obj);
1343 }
1344 
1345 static void
link_connection_init(LinkConnection * cnx)1346 link_connection_init (LinkConnection *cnx)
1347 {
1348 	d_printf ("create new connection %p\n", cnx);
1349 
1350 	cnx->priv = g_new0 (LinkConnectionPrivate, 1);
1351 	cnx->priv->fd = -1;
1352 	cnx->priv->was_disconnected = FALSE;
1353 
1354 	cnx->timeout_mutex = NULL;
1355 	cnx->timeout_msec = 0;
1356 	cnx->timeout_source_id = 0;
1357 	cnx->timeout_status = LINK_TIMEOUT_UNKNOWN;
1358 	cnx->tdata = NULL;
1359 
1360 #ifdef CONNECTION_DEBUG
1361 	cnx->priv->total_read_bytes = 0;
1362 	cnx->priv->total_written_bytes = 0;
1363 #endif
1364 }
1365 
1366 static void
link_connection_class_init(LinkConnectionClass * klass)1367 link_connection_class_init (LinkConnectionClass *klass)
1368 {
1369 	GObjectClass *object_class = (GObjectClass *) klass;
1370 
1371 	object_class->dispose  = link_connection_dispose;
1372 	object_class->finalize = link_connection_finalize;
1373 
1374 	signals [BROKEN] =
1375 		g_signal_new ("broken",
1376 			      G_TYPE_FROM_CLASS (object_class),
1377 			      G_SIGNAL_RUN_LAST,
1378 			      G_STRUCT_OFFSET (LinkConnectionClass, broken),
1379 			      NULL, NULL,
1380 			      g_cclosure_marshal_VOID__VOID,
1381 			      G_TYPE_NONE, 0);
1382 
1383 	signals [BLOCKING] =
1384 		g_signal_new ("blocking",
1385 			      G_TYPE_FROM_CLASS (object_class),
1386 			      G_SIGNAL_RUN_LAST,
1387 			      G_STRUCT_OFFSET (LinkConnectionClass, blocking),
1388 			      NULL, NULL,
1389 			      g_cclosure_marshal_VOID__ULONG,
1390 			      G_TYPE_NONE, 1, G_TYPE_ULONG);
1391 
1392 	parent_class = g_type_class_peek_parent (klass);
1393 }
1394 
1395 GType
link_connection_get_type(void)1396 link_connection_get_type (void)
1397 {
1398 	static GType object_type = 0;
1399 
1400 	if (!object_type) {
1401 		static const GTypeInfo object_info = {
1402 			sizeof (LinkConnectionClass),
1403 			(GBaseInitFunc) NULL,
1404 			(GBaseFinalizeFunc) NULL,
1405 			(GClassInitFunc) link_connection_class_init,
1406 			NULL,           /* class_finalize */
1407 			NULL,           /* class_data */
1408 			sizeof (LinkConnection),
1409 			0,              /* n_preallocs */
1410 			(GInstanceInitFunc) link_connection_init
1411 		};
1412 
1413 		object_type = g_type_register_static (G_TYPE_OBJECT,
1414 						      "LinkConnection",
1415 						      &object_info,
1416 						      0);
1417 	}
1418 
1419 	return object_type;
1420 }
1421 
1422 
1423 LinkWriteOpts *
link_write_options_new(gboolean block_on_write)1424 link_write_options_new (gboolean block_on_write)
1425 {
1426 	LinkWriteOpts *write_opts = g_new0 (LinkWriteOpts, 1);
1427 
1428 	write_opts->block_on_write = block_on_write;
1429 
1430 	return write_opts;
1431 }
1432 
1433 void
link_write_options_free(LinkWriteOpts * write_opts)1434 link_write_options_free (LinkWriteOpts *write_opts)
1435 {
1436 	g_free (write_opts);
1437 }
1438 
1439 void
link_connection_set_max_buffer(LinkConnection * cnx,gulong max_buffer_bytes)1440 link_connection_set_max_buffer (LinkConnection *cnx,
1441 				gulong          max_buffer_bytes)
1442 {
1443 	g_return_if_fail (cnx != NULL);
1444 
1445 	CNX_LOCK (cnx);
1446 	/* FIXME: we might want to check the current buffer size */
1447 	cnx->priv->max_buffer_bytes = max_buffer_bytes;
1448 
1449 	CNX_UNLOCK (cnx);
1450 }
1451 
1452 static gboolean
link_connection_io_handler(GIOChannel * gioc,GIOCondition condition,gpointer data)1453 link_connection_io_handler (GIOChannel  *gioc,
1454 			    GIOCondition condition,
1455 			    gpointer     data)
1456 {
1457 	LinkConnection      *cnx = data;
1458 	LinkConnectionClass *klass;
1459 
1460 	d_printf ("link_connection_io_handler fd %d, 0x%x\n",
1461 		  cnx->priv->fd, condition);
1462 
1463 	CNX_LOCK (cnx);
1464 	link_connection_ref_T (cnx);
1465 
1466 	klass = (LinkConnectionClass *) G_TYPE_INSTANCE_GET_CLASS (
1467 		data, LINK_TYPE_CONNECTION, LinkConnection);
1468 
1469 	if (cnx->status == LINK_CONNECTED &&
1470 	    condition & LINK_IN_CONDS && klass->handle_input) {
1471 
1472 		d_printf ("Handle input on fd %d\n", cnx->priv->fd);
1473 
1474 		CNX_UNLOCK (cnx);
1475 		klass->handle_input (cnx);
1476 		CNX_LOCK (cnx);
1477 	}
1478 
1479 	if (cnx->status == LINK_CONNECTED && condition & G_IO_OUT) {
1480 		d_printf ("IO Out - buffer space free ...\n");
1481 		link_connection_flush_write_queue_T_R (cnx);
1482 	}
1483 
1484 	if (condition & (LINK_ERR_CONDS | G_IO_OUT)) {
1485 		int rv, n;
1486 		LinkSockLen n_size = sizeof (n);
1487 
1488 		switch (cnx->status) {
1489 		case LINK_CONNECTING:
1490 			n = 0;
1491 			rv = getsockopt (cnx->priv->fd, SOL_SOCKET, SO_ERROR, (char *) &n, &n_size);
1492 			if (!rv && !n && condition == G_IO_OUT) {
1493 				d_printf ("State changed to connected on %d\n", cnx->priv->fd);
1494 
1495 				link_watch_set_condition (
1496 					cnx->priv->tag,
1497 					LINK_ERR_CONDS | LINK_IN_CONDS);
1498 
1499 				link_connection_state_changed_T_R (cnx, LINK_CONNECTED);
1500 
1501 				if (cnx->priv->write_queue) {
1502 					d_printf ("Connected, with queued writes, start flush ...\n");
1503 					link_connection_flush_write_queue_T_R (cnx);
1504 				}
1505 			} else {
1506 				d_printf ("Error connecting %d %d %d on fd %d\n",
1507 					   rv, n, errno, cnx->priv->fd);
1508 				link_connection_state_changed_T_R (cnx, LINK_DISCONNECTED);
1509 			}
1510 			break;
1511 		case LINK_CONNECTED: {
1512 			if (condition & LINK_ERR_CONDS) {
1513 				d_printf ("Disconnect on err: %d\n", cnx->priv->fd);
1514 				link_connection_state_changed_T_R (cnx, LINK_DISCONNECTED);
1515 			}
1516 			break;
1517 		}
1518 		default:
1519 			break;
1520 		}
1521 	}
1522 
1523 	link_connection_unref_unlock (cnx);
1524 
1525 	return TRUE;
1526 }
1527 
1528 LinkConnectionStatus
link_connection_get_status(LinkConnection * cnx)1529 link_connection_get_status (LinkConnection *cnx)
1530 {
1531 	LinkConnectionStatus status;
1532 
1533 	CNX_LOCK (cnx);
1534 	status = cnx->status;
1535 	CNX_UNLOCK (cnx);
1536 
1537 	d_printf ("Get status on %p = %d\n", cnx, status);
1538 
1539 	return status;
1540 }
1541 
1542 void
link_connection_exec_disconnect(LinkCommandDisconnect * cmd,gboolean immediate)1543 link_connection_exec_disconnect (LinkCommandDisconnect *cmd, gboolean immediate)
1544 {
1545 	d_printf ("Exec defered disconnect on %p\n", cmd->cnx);
1546 
1547 	link_connection_state_changed (cmd->cnx, LINK_DISCONNECTED);
1548 
1549 	link_connection_unref (cmd->cnx);
1550 	g_free (cmd);
1551 }
1552 
1553 void
link_connection_disconnect(LinkConnection * cnx)1554 link_connection_disconnect (LinkConnection *cnx)
1555 {
1556 	LinkCommandDisconnect *cmd;
1557 
1558 	cmd = g_new (LinkCommandDisconnect, 1);
1559 	cmd->cmd.type = LINK_COMMAND_DISCONNECT;
1560 	cmd->cnx = link_connection_ref (cnx);
1561 
1562 	link_exec_command ((LinkCommand *) cmd);
1563 }
1564 
1565 LinkConnectionStatus
link_connection_wait_connected(LinkConnection * cnx)1566 link_connection_wait_connected (LinkConnection *cnx)
1567 {
1568 	LinkConnectionStatus status;
1569 
1570 	CNX_LOCK (cnx);
1571 
1572 	status = link_connection_wait_connected_T (cnx);
1573 
1574 	CNX_UNLOCK (cnx);
1575 
1576 	return status;
1577 }
1578 
1579 void
link_connections_move_io_T(gboolean to_io_thread)1580 link_connections_move_io_T (gboolean to_io_thread)
1581 {
1582 	GList *l;
1583 	for (l = cnx_list; l; l = l->next) {
1584 		LinkConnection *cnx = l->data;
1585 		link_watch_move_io (cnx->priv->tag, to_io_thread);
1586 	}
1587 }
1588 
1589 void
link_connection_add_broken_cb(LinkConnection * cnx,LinkBrokenCallback fn,gpointer user_data)1590 link_connection_add_broken_cb (LinkConnection    *cnx,
1591 			       LinkBrokenCallback fn,
1592 			       gpointer           user_data)
1593 {
1594 	BrokenCallback *bc = g_new0 (BrokenCallback, 1);
1595 
1596 	g_return_if_fail (fn != NULL);
1597 
1598 	bc->fn = fn;
1599 	bc->user_data = user_data;
1600 
1601 	cnx->idle_broken_callbacks = g_slist_prepend (cnx->idle_broken_callbacks, bc);
1602 }
1603 
1604 static gboolean
broken_callback_match(BrokenCallback * bc,LinkBrokenCallback fn,gpointer user_data)1605 broken_callback_match (BrokenCallback    *bc,
1606 		       LinkBrokenCallback fn,
1607 		       gpointer           user_data)
1608 {
1609 	return ( (!fn || bc->fn == fn) &&
1610 		 (!user_data || bc->user_data == user_data) );
1611 }
1612 
1613 void
link_connection_remove_broken_cb(LinkConnection * cnx,LinkBrokenCallback opt_fn,gpointer opt_user_data)1614 link_connection_remove_broken_cb (LinkConnection    *cnx,
1615 				  LinkBrokenCallback opt_fn,
1616 				  gpointer           opt_user_data)
1617 {
1618 	GSList *l, *next;
1619 
1620 	CNX_LOCK (cnx);
1621 
1622 	for (l = cnx->idle_broken_callbacks; l; l = next) {
1623 		next = l->next;
1624 		if (broken_callback_match (l->data, opt_fn, opt_user_data)) {
1625 			g_free (l->data);
1626 			cnx->idle_broken_callbacks =
1627 				g_slist_delete_link (cnx->idle_broken_callbacks,
1628 						     l);
1629 		}
1630 	}
1631 
1632 	CNX_UNLOCK (cnx);
1633 }
1634 
1635 void
link_connections_close(void)1636 link_connections_close (void)
1637 {
1638 	GList *cnx, *l;
1639 
1640 	if (!link_in_io_thread ())
1641 		return;
1642 
1643 	CNX_LIST_LOCK();
1644 	cnx = cnx_list;
1645 	cnx_list = NULL;
1646 	CNX_LIST_UNLOCK();
1647 
1648 	if (!cnx)
1649 		return;
1650 
1651 	/* FIXME: Need to shutdown linc connections ... */
1652 
1653 	for (l = cnx; l; l = l->next)
1654 		g_object_run_dispose (l->data);
1655 
1656 	g_list_free (cnx);
1657 }
1658 
1659 void
link_set_timeout(guint msec)1660 link_set_timeout (guint msec)
1661 {
1662 	_link_timeout = msec;
1663 }
1664 
1665