1 /*
2  * linc.c: This file is part of the linc library.
3  *
4  * Authors:
5  *    Elliot Lee     (sopwith@redhat.com)
6  *    Michael Meeks  (michael@ximian.com)
7  *    Mark McLouglin (mark@skynet.ie) & others
8  *
9  * Copyright 2001, Red Hat, Inc., Ximian, Inc.,
10  *                 Sun Microsystems, Inc.
11  */
12 #include <config.h>
13 
14 #include <stdio.h>
15 #include <errno.h>
16 #ifdef HAVE_UNISTD_H
17 #  include <unistd.h>
18 #endif
19 #include <signal.h>
20 #include <fcntl.h>
21 #include "linc-private.h"
22 #include "linc-compat.h"
23 
24 #include <glib/gstdio.h>
25 
26 /* whether we do locking or not */
27 static gboolean link_is_thread_safe = TRUE;
28 /* an inferior loop/context for std. processing */
29 GMainLoop             *link_loop = NULL;
30 static GMainContext   *link_context = NULL;
31 /* an inferior context for the I/O thread */
32 static GThread        *link_io_thread = NULL;
33 static GMainLoop      *link_thread_loop = NULL;
34 static GMainContext   *link_thread_context = NULL;
35 static gboolean        link_is_io_in_thread = FALSE;
36 
37 /* a big global lock for link */
38 static GMutex  *link_main_lock;
39 static GCond   *link_main_cond;
40 /* command dispatch to the I/O loop */
41 static GMutex  *link_cmd_queue_lock = NULL;
42 static GCond   *link_cmd_queue_cond = NULL;
43 static GList   *link_cmd_queue = NULL;
44 
45 static int link_wakeup_fds[2] = { -1, -1 };
46 #define LINK_WAKEUP_POLL  link_wakeup_fds [0]
47 #define LINK_WAKEUP_WRITE link_wakeup_fds [1]
48 static GSource *link_main_source = NULL;
49 
50 #ifdef LINK_SSL_SUPPORT
51 SSL_METHOD *link_ssl_method;
52 SSL_CTX    *link_ssl_ctx;
53 #endif
54 
55 static void link_dispatch_command (gpointer data, gboolean immediate);
56 
57 gboolean
link_thread_io(void)58 link_thread_io (void)
59 {
60 	gboolean result;
61 
62 	/* FIXME: re-factor this to avoid locking */
63 	result = link_io_thread != NULL;
64 
65 	return result;
66 }
67 
68 gboolean
link_thread_safe(void)69 link_thread_safe (void)
70 {
71 	return link_is_thread_safe;
72 }
73 
74 static gboolean
cmd_is_sync(LinkCommand * cmd)75 cmd_is_sync (LinkCommand *cmd)
76 {
77 	return (cmd->type == LINK_COMMAND_SET_IO_THREAD) ||
78 		(cmd->type == LINK_COMMAND_CNX_UNREF);
79 }
80 
81 static gboolean
link_mainloop_handle_input(GIOChannel * source,GIOCondition condition,gpointer data)82 link_mainloop_handle_input (GIOChannel   *source,
83 			    GIOCondition  condition,
84 			    gpointer      data)
85 {
86 	char c;
87 	GList *l, *queue;
88 
89 	g_mutex_lock (link_cmd_queue_lock);
90 
91 #ifdef HAVE_WINSOCK2_H
92 	recv (LINK_WAKEUP_POLL, &c, sizeof (c), 0);
93 #else
94 	read (LINK_WAKEUP_POLL, &c, sizeof (c));
95 #endif
96 	queue = link_cmd_queue;
97 	link_cmd_queue = NULL;
98 
99 	g_mutex_unlock (link_cmd_queue_lock);
100 
101 	for (l = queue; l; l = l->next) {
102 		gboolean sync;
103 
104 		sync = cmd_is_sync (l->data);
105 
106 		link_dispatch_command (l->data, FALSE);
107 
108 		if (sync) {
109 			g_mutex_lock (link_cmd_queue_lock);
110 			((LinkSyncCommand *)l->data)->complete = TRUE;
111 			g_cond_broadcast (link_cmd_queue_cond);
112 			g_mutex_unlock (link_cmd_queue_lock);
113 		}
114 	}
115 
116 	g_list_free (queue);
117 
118 	return TRUE;
119 }
120 
121 void
link_exec_command(LinkCommand * cmd)122 link_exec_command (LinkCommand *cmd)
123 {
124 	int  res = 0;
125 
126 	if (link_in_io_thread ()) {
127 		link_dispatch_command (cmd, TRUE);
128 		return;
129 	}
130 
131 	LINK_MUTEX_LOCK (link_cmd_queue_lock);
132 
133 	if (LINK_WAKEUP_WRITE == -1) { /* shutdown main loop */
134 		LINK_MUTEX_UNLOCK (link_cmd_queue_lock);
135 		link_dispatch_command (cmd, TRUE);
136 		return;
137 	}
138 
139 	if (!link_cmd_queue) {
140 		char c = 'L'; /* magic */
141 #ifdef HAVE_WINSOCK2_H
142 		while ((res = send (LINK_WAKEUP_WRITE, &c, sizeof (c), 0)) == SOCKET_ERROR  &&
143 		       (WSAGetLastError () == WSAEWOULDBLOCK));
144 #else
145 		while ((res = write (LINK_WAKEUP_WRITE, &c, sizeof (c))) < 0  &&
146 		       (errno == EAGAIN || errno == EINTR));
147 #endif
148 	}
149 
150 	link_cmd_queue = g_list_append (link_cmd_queue, cmd);
151 
152 	if (cmd_is_sync (cmd))
153 		while (!((LinkSyncCommand *)cmd)->complete)
154 			g_cond_wait (link_cmd_queue_cond,
155 				     link_cmd_queue_lock);
156 
157 	LINK_MUTEX_UNLOCK (link_cmd_queue_lock);
158 
159 	if (res < 0)
160 		g_error ("Failed to write to linc wakeup socket %d 0x%x(%d) (%d)",
161 			 res, errno, errno, LINK_WAKEUP_WRITE);
162 }
163 
164 #if defined (CONNECTION_DEBUG) && defined (CONNECTION_DEBUG_FLAG)
165 gboolean link_connection_debug_flag = FALSE;
166 #endif
167 
168 /**
169  * link_init:
170  * @thread_safe: if we want thread safety enabled.
171  *
172  * Initialize linc.
173  **/
174 void
link_init(gboolean thread_safe)175 link_init (gboolean thread_safe)
176 {
177 #if defined (CONNECTION_DEBUG) && defined (CONNECTION_DEBUG_FLAG)
178 	if (getenv ("LINK_CONNECTION_DEBUG"))
179 		link_connection_debug_flag = TRUE;
180 	if (link_connection_debug_flag &&
181 	    getenv ("LINK_PER_PROCESS_STDERR") &&
182 	    fileno (stderr) >= 0) {
183 		char *stderr_file = g_build_filename (g_get_tmp_dir (),
184 						      g_strdup_printf ("link_debug.%d", getpid ()),
185 						      NULL);
186 		int fd;
187 		fd = g_open (stderr_file, O_WRONLY|O_CREAT, 0666);
188 		if (fd >= 0) {
189 			char *prgname = g_get_prgname ();
190 			d_printf ("Redirecting stderr of %s to %s\n",
191 				  (prgname ? prgname : "this process"), stderr_file);
192 			dup2 (fd, fileno (stderr));
193 			close (fd);
194 		}
195 	        d_printf ("stderr redirected here\n");
196 	}
197 #endif
198 
199 	if (thread_safe && !g_thread_supported ())
200 		g_thread_init (NULL);
201 
202 	link_is_thread_safe = (thread_safe && g_thread_supported());
203 
204 	g_type_init ();
205 
206 #ifdef SIGPIPE
207 	/*
208 	 * Link's raison d'etre is for ORBit2 and Bonobo
209 	 *
210 	 * In Bonobo, components and containers must not crash if the
211 	 * remote end crashes.  If a remote server crashes and then we
212 	 * try to make a CORBA call on it, we may get a SIGPIPE.  So,
213 	 * for lack of a better solution, we ignore SIGPIPE here.  This
214 	 * is open for reconsideration in the future.
215 	 *
216 	 * When SIGPIPE is ignored, write() calls which would
217 	 * ordinarily trigger a signal will instead return -1 and set
218 	 * errno to EPIPE.  So linc will be able to catch these
219 	 * errors instead of letting them kill the component.
220 	 *
221 	 * Possibilities are the MSG_PEEK trick, where you test if the
222 	 * connection is dead right before doing the writev().  That
223 	 * approach has two problems:
224 	 *
225 	 *   1. There is the possibility of a race condition, where
226 	 *      the remote end calls right after the test, and right
227 	 *      before the writev().
228 	 *
229 	 *   2. An extra system call per write might be regarded by
230 	 *      some as a performance hit.
231 	 *
232 	 * Another possibility is to surround the call to writev() in
233 	 * link_connection_writev (linc-connection.c) with something like
234 	 * this:
235 	 *
236 	 *		link_ignore_sigpipe = 1;
237 	 *
238 	 *		result = writev ( ... );
239 	 *
240 	 *		link_ignore_sigpipe = 0;
241 	 *
242 	 * The SIGPIPE signal handler will check the global
243 	 * link_ignore_sigpipe variable and ignore the signal if it
244 	 * is 1.  If it is 0, it can proxy to the user's original
245 	 * signal handler.  This is a real possibility.
246 	 */
247 	signal (SIGPIPE, SIG_IGN);
248 #endif
249 
250 	link_context = g_main_context_new ();
251 	link_loop    = g_main_loop_new (link_context, TRUE);
252 
253 #ifdef LINK_SSL_SUPPORT
254 	SSLeay_add_ssl_algorithms ();
255 	link_ssl_method = SSLv23_method ();
256 	link_ssl_ctx = SSL_CTX_new (link_ssl_method);
257 #endif
258 
259 	link_main_lock = link_mutex_new ();
260 	link_cmd_queue_lock = link_mutex_new ();
261 	if (link_is_thread_safe) {
262 		link_main_cond = g_cond_new ();
263 		link_cmd_queue_cond = g_cond_new ();
264 	}
265 
266 #ifdef HAVE_WINSOCK2_H
267 	{
268 		WSADATA wsadata;
269 		if (WSAStartup (MAKEWORD (2, 0), &wsadata) != 0)
270 			g_error ("Windows Sockets could not be initialized");
271 	}
272 #endif
273 }
274 
275 /**
276  * link_main_iteration:
277  * @block_for_reply: whether we should wait for a reply
278  *
279  * This routine iterates the linc mainloop, which has
280  * only the linc sources registered against it.
281  **/
282 void
link_main_iteration(gboolean block_for_reply)283 link_main_iteration (gboolean block_for_reply)
284 {
285 	g_main_context_iteration (
286 		link_context, block_for_reply);
287 }
288 
289 /**
290  * link_main_pending:
291  *
292  * determines if the linc mainloop has any pending work to process.
293  *
294  * Return value: TRUE if the linc mainloop has any pending work to process.
295  **/
296 gboolean
link_main_pending(void)297 link_main_pending (void)
298 {
299 	return g_main_context_pending (link_context);
300 }
301 
302 /**
303  * link_main_loop_run:
304  *
305  * Runs the linc mainloop; blocking until the loop is exited.
306  **/
307 void
link_main_loop_run(void)308 link_main_loop_run (void)
309 {
310 	g_main_loop_run (link_loop);
311 }
312 
313 /**
314  * link_mutex_new:
315  *
316  * Creates a mutex, iff threads are supported, initialized etc.
317  *
318  * Return value: a new GMutex, or NULL if one is not required.
319  **/
320 GMutex *
link_mutex_new(void)321 link_mutex_new (void)
322 {
323 	if (link_is_thread_safe)
324 		return g_mutex_new ();
325 	else
326 		return NULL;
327 }
328 
329 gboolean
link_in_io_thread(void)330 link_in_io_thread (void)
331 {
332 	return (!link_io_thread ||
333 		g_thread_self() == link_io_thread);
334 }
335 
336 GMainContext *
link_main_get_context(void)337 link_main_get_context (void)
338 {
339 	return link_context;
340 }
341 
342 /*
343  *   This method is unreliable, and for use
344  * only for debugging.
345  */
346 gboolean
link_mutex_is_locked(GMutex * lock)347 link_mutex_is_locked (GMutex *lock)
348 {
349 #ifdef __GLIBC__
350 	gboolean result = TRUE;
351 
352 	if (lock && g_mutex_trylock (lock)) {
353 		result = FALSE;
354 		g_mutex_unlock (lock);
355 	}
356 
357 	return result;
358 #else
359 	/*
360 	 * On at least Solaris & BSD if we link our
361 	 * app without -lthread, and pull in ORBit2
362 	 * with threading enabled, we get NOP pthread
363 	 * operations. This is fine mostly, but we get
364 	 * bogus return values from trylock which screws
365 	 * our debugging.
366 	 */
367 	d_printf ("hosed system is_lock-ing\n");
368 	return TRUE;
369 #endif
370 }
371 
372 void
link_shutdown(void)373 link_shutdown (void)
374 {
375 	if (link_loop) /* break into the linc loop */
376 		g_main_loop_quit (link_loop);
377 
378 	if (link_thread_loop)
379 		g_main_loop_quit (link_thread_loop);
380 
381 	if (link_io_thread) {
382 		g_thread_join (link_io_thread);
383 		link_io_thread = NULL;
384 	}
385 }
386 
387 GMainContext *
link_thread_io_context(void)388 link_thread_io_context (void)
389 {
390 	return link_thread_context;
391 }
392 
393 static gpointer
link_io_thread_fn(gpointer data)394 link_io_thread_fn (gpointer data)
395 {
396 	g_main_loop_run (link_thread_loop);
397 
398 	/* FIXME: need to be able to quit without waiting ... */
399 
400 	/* Asked to quit - so ...
401 	 * a) stop accepting inputs [ kill servers ]
402 	 * b) flush outgoing queued data etc. (oneways)
403 	 * c) unref all leakable resources.
404 	 */
405 
406 	link_connections_close ();
407 
408 	/* A tad of shutdown */
409 	LINK_MUTEX_LOCK (link_cmd_queue_lock);
410 	if (LINK_WAKEUP_WRITE >= 0) {
411 #ifdef HAVE_WINSOCK2_H
412 		closesocket (LINK_WAKEUP_WRITE);
413 		closesocket (LINK_WAKEUP_POLL);
414 #else
415 		close (LINK_WAKEUP_WRITE);
416 		close (LINK_WAKEUP_POLL);
417 #endif
418 		LINK_WAKEUP_WRITE = -1;
419 		LINK_WAKEUP_POLL = -1;
420 	}
421 	LINK_MUTEX_UNLOCK (link_cmd_queue_lock);
422 
423 	if (link_main_source) {
424 		g_source_destroy (link_main_source);
425 		g_source_unref (link_main_source);
426 		link_main_source = NULL;
427 	}
428 
429 	return NULL;
430 }
431 
432 static void
link_exec_set_io_thread(gpointer data,gboolean immediate)433 link_exec_set_io_thread (gpointer data, gboolean immediate)
434 {
435 	GError *error = NULL;
436 	gboolean to_io_thread = TRUE;
437 
438 	link_lock ();
439 	if (link_is_io_in_thread) {
440 		link_unlock ();
441 		return;
442 	}
443 
444 	g_mutex_lock (link_cmd_queue_lock);
445 
446 	link_is_io_in_thread = TRUE;
447 
448 	link_thread_context = g_main_context_new ();
449 	link_thread_loop = g_main_loop_new (link_thread_context, TRUE);
450 
451 	link_connections_move_io_T (to_io_thread);
452 	link_servers_move_io_T     (to_io_thread);
453 
454 	if (link_pipe (link_wakeup_fds) < 0)
455 		g_error ("Can't create CORBA main-thread wakeup pipe");
456 
457 	link_main_source = link_source_create_watch
458 		(link_thread_context, LINK_WAKEUP_POLL,
459 		 NULL, (G_IO_IN | G_IO_PRI),
460 		 link_mainloop_handle_input, NULL);
461 
462 	link_io_thread = g_thread_create_full
463 		(link_io_thread_fn, NULL, 256 * 1024, TRUE, FALSE,
464 		 G_THREAD_PRIORITY_NORMAL, &error);
465 
466 	if (!link_io_thread || error)
467 		g_error ("Failed to create linc worker thread");
468 
469 	g_main_loop_quit (link_loop);
470 
471 	g_mutex_unlock (link_cmd_queue_lock);
472 	link_unlock ();
473 }
474 
475 void
link_set_io_thread(gboolean io_in_thread)476 link_set_io_thread (gboolean io_in_thread)
477 {
478 	LinkSyncCommand cmd = { { 0 }, 0 };
479 
480 	cmd.cmd.type = LINK_COMMAND_SET_IO_THREAD;
481 
482 	link_exec_command (&cmd.cmd);
483 }
484 
485 static void
link_dispatch_command(gpointer data,gboolean immediate)486 link_dispatch_command (gpointer data, gboolean immediate)
487 {
488 	LinkCommand *cmd = data;
489 	switch (cmd->type) {
490 	case LINK_COMMAND_SET_CONDITION:
491 		link_connection_exec_set_condition (data, immediate);
492 		break;
493 	case LINK_COMMAND_DISCONNECT:
494 		link_connection_exec_disconnect (data, immediate);
495 		break;
496 	case LINK_COMMAND_SET_IO_THREAD:
497 		link_exec_set_io_thread (data, immediate);
498 		break;
499 	case LINK_COMMAND_CNX_UNREF:
500 		link_connection_exec_cnx_unref (data, immediate);
501 		break;
502 	default:
503 		g_error ("Unimplemented (%d)", cmd->type);
504 		break;
505 	}
506 }
507 
508 void
link_lock(void)509 link_lock (void)
510 {
511 	if (link_main_lock)
512 		g_mutex_lock (link_main_lock);
513 }
514 
515 void
link_unlock(void)516 link_unlock (void)
517 {
518 	if (link_main_lock)
519 		g_mutex_unlock (link_main_lock);
520 }
521 
522 void
link_signal(void)523 link_signal (void)
524 {
525 	if (link_is_thread_safe && link_is_io_in_thread) {
526 		g_assert (link_main_cond != NULL);
527 		g_assert (link_is_locked ());
528 		g_cond_broadcast (link_main_cond);
529 	}
530 }
531 
532 void
link_wait(void)533 link_wait (void)
534 {
535 	if (!(link_is_thread_safe && link_is_io_in_thread)) {
536 		link_unlock ();
537 		link_main_iteration (TRUE);
538 		link_lock ();
539 	} else {
540 		g_assert (link_main_cond != NULL);
541 		g_cond_wait (link_main_cond, link_main_lock);
542 	}
543 }
544 
545 
546 
547 gboolean
link_is_locked(void)548 link_is_locked (void)
549 {
550 	return link_mutex_is_locked (link_main_lock);
551 }
552 
553 /* Hack */
554 guint
link_io_thread_add_timeout(guint interval,GSourceFunc function,gpointer data)555 link_io_thread_add_timeout (guint       interval,
556                             GSourceFunc function,
557                             gpointer    data)
558 {
559 	guint id;
560 	GSource *tsrc;
561 
562 	if (!link_thread_safe())
563 		return 0;
564 
565 	tsrc = g_timeout_source_new (interval);
566 	g_source_set_priority (tsrc, G_PRIORITY_HIGH_IDLE);
567 	g_source_set_callback (tsrc, function, data, NULL);
568 	g_source_set_can_recurse (tsrc, TRUE);
569 	id = g_source_attach (tsrc, link_thread_context);
570 	g_source_unref (tsrc);
571 
572 	return id;
573 }
574 
575 void
link_io_thread_remove_timeout(guint source_id)576 link_io_thread_remove_timeout (guint source_id)
577 {
578 	GSource *tsrc;
579 
580 	if (!source_id)
581 		return;
582 
583 	tsrc = g_main_context_find_source_by_id (link_thread_context, source_id);
584 	g_source_destroy (tsrc);
585 }
586