1 /*
2 * linc.c: This file is part of the linc library.
3 *
4 * Authors:
5 * Elliot Lee (sopwith@redhat.com)
6 * Michael Meeks (michael@ximian.com)
7 * Mark McLouglin (mark@skynet.ie) & others
8 *
9 * Copyright 2001, Red Hat, Inc., Ximian, Inc.,
10 * Sun Microsystems, Inc.
11 */
12 #include <config.h>
13
14 #include <stdio.h>
15 #include <errno.h>
16 #ifdef HAVE_UNISTD_H
17 # include <unistd.h>
18 #endif
19 #include <signal.h>
20 #include <fcntl.h>
21 #include "linc-private.h"
22 #include "linc-compat.h"
23
24 #include <glib/gstdio.h>
25
26 /* whether we do locking or not */
27 static gboolean link_is_thread_safe = TRUE;
28 /* an inferior loop/context for std. processing */
29 GMainLoop *link_loop = NULL;
30 static GMainContext *link_context = NULL;
31 /* an inferior context for the I/O thread */
32 static GThread *link_io_thread = NULL;
33 static GMainLoop *link_thread_loop = NULL;
34 static GMainContext *link_thread_context = NULL;
35 static gboolean link_is_io_in_thread = FALSE;
36
37 /* a big global lock for link */
38 static GMutex *link_main_lock;
39 static GCond *link_main_cond;
40 /* command dispatch to the I/O loop */
41 static GMutex *link_cmd_queue_lock = NULL;
42 static GCond *link_cmd_queue_cond = NULL;
43 static GList *link_cmd_queue = NULL;
44
45 static int link_wakeup_fds[2] = { -1, -1 };
46 #define LINK_WAKEUP_POLL link_wakeup_fds [0]
47 #define LINK_WAKEUP_WRITE link_wakeup_fds [1]
48 static GSource *link_main_source = NULL;
49
50 #ifdef LINK_SSL_SUPPORT
51 SSL_METHOD *link_ssl_method;
52 SSL_CTX *link_ssl_ctx;
53 #endif
54
55 static void link_dispatch_command (gpointer data, gboolean immediate);
56
57 gboolean
link_thread_io(void)58 link_thread_io (void)
59 {
60 gboolean result;
61
62 /* FIXME: re-factor this to avoid locking */
63 result = link_io_thread != NULL;
64
65 return result;
66 }
67
68 gboolean
link_thread_safe(void)69 link_thread_safe (void)
70 {
71 return link_is_thread_safe;
72 }
73
74 static gboolean
cmd_is_sync(LinkCommand * cmd)75 cmd_is_sync (LinkCommand *cmd)
76 {
77 return (cmd->type == LINK_COMMAND_SET_IO_THREAD) ||
78 (cmd->type == LINK_COMMAND_CNX_UNREF);
79 }
80
81 static gboolean
link_mainloop_handle_input(GIOChannel * source,GIOCondition condition,gpointer data)82 link_mainloop_handle_input (GIOChannel *source,
83 GIOCondition condition,
84 gpointer data)
85 {
86 char c;
87 GList *l, *queue;
88
89 g_mutex_lock (link_cmd_queue_lock);
90
91 #ifdef HAVE_WINSOCK2_H
92 recv (LINK_WAKEUP_POLL, &c, sizeof (c), 0);
93 #else
94 read (LINK_WAKEUP_POLL, &c, sizeof (c));
95 #endif
96 queue = link_cmd_queue;
97 link_cmd_queue = NULL;
98
99 g_mutex_unlock (link_cmd_queue_lock);
100
101 for (l = queue; l; l = l->next) {
102 gboolean sync;
103
104 sync = cmd_is_sync (l->data);
105
106 link_dispatch_command (l->data, FALSE);
107
108 if (sync) {
109 g_mutex_lock (link_cmd_queue_lock);
110 ((LinkSyncCommand *)l->data)->complete = TRUE;
111 g_cond_broadcast (link_cmd_queue_cond);
112 g_mutex_unlock (link_cmd_queue_lock);
113 }
114 }
115
116 g_list_free (queue);
117
118 return TRUE;
119 }
120
121 void
link_exec_command(LinkCommand * cmd)122 link_exec_command (LinkCommand *cmd)
123 {
124 int res = 0;
125
126 if (link_in_io_thread ()) {
127 link_dispatch_command (cmd, TRUE);
128 return;
129 }
130
131 LINK_MUTEX_LOCK (link_cmd_queue_lock);
132
133 if (LINK_WAKEUP_WRITE == -1) { /* shutdown main loop */
134 LINK_MUTEX_UNLOCK (link_cmd_queue_lock);
135 link_dispatch_command (cmd, TRUE);
136 return;
137 }
138
139 if (!link_cmd_queue) {
140 char c = 'L'; /* magic */
141 #ifdef HAVE_WINSOCK2_H
142 while ((res = send (LINK_WAKEUP_WRITE, &c, sizeof (c), 0)) == SOCKET_ERROR &&
143 (WSAGetLastError () == WSAEWOULDBLOCK));
144 #else
145 while ((res = write (LINK_WAKEUP_WRITE, &c, sizeof (c))) < 0 &&
146 (errno == EAGAIN || errno == EINTR));
147 #endif
148 }
149
150 link_cmd_queue = g_list_append (link_cmd_queue, cmd);
151
152 if (cmd_is_sync (cmd))
153 while (!((LinkSyncCommand *)cmd)->complete)
154 g_cond_wait (link_cmd_queue_cond,
155 link_cmd_queue_lock);
156
157 LINK_MUTEX_UNLOCK (link_cmd_queue_lock);
158
159 if (res < 0)
160 g_error ("Failed to write to linc wakeup socket %d 0x%x(%d) (%d)",
161 res, errno, errno, LINK_WAKEUP_WRITE);
162 }
163
164 #if defined (CONNECTION_DEBUG) && defined (CONNECTION_DEBUG_FLAG)
165 gboolean link_connection_debug_flag = FALSE;
166 #endif
167
168 /**
169 * link_init:
170 * @thread_safe: if we want thread safety enabled.
171 *
172 * Initialize linc.
173 **/
174 void
link_init(gboolean thread_safe)175 link_init (gboolean thread_safe)
176 {
177 #if defined (CONNECTION_DEBUG) && defined (CONNECTION_DEBUG_FLAG)
178 if (getenv ("LINK_CONNECTION_DEBUG"))
179 link_connection_debug_flag = TRUE;
180 if (link_connection_debug_flag &&
181 getenv ("LINK_PER_PROCESS_STDERR") &&
182 fileno (stderr) >= 0) {
183 char *stderr_file = g_build_filename (g_get_tmp_dir (),
184 g_strdup_printf ("link_debug.%d", getpid ()),
185 NULL);
186 int fd;
187 fd = g_open (stderr_file, O_WRONLY|O_CREAT, 0666);
188 if (fd >= 0) {
189 char *prgname = g_get_prgname ();
190 d_printf ("Redirecting stderr of %s to %s\n",
191 (prgname ? prgname : "this process"), stderr_file);
192 dup2 (fd, fileno (stderr));
193 close (fd);
194 }
195 d_printf ("stderr redirected here\n");
196 }
197 #endif
198
199 if (thread_safe && !g_thread_supported ())
200 g_thread_init (NULL);
201
202 link_is_thread_safe = (thread_safe && g_thread_supported());
203
204 g_type_init ();
205
206 #ifdef SIGPIPE
207 /*
208 * Link's raison d'etre is for ORBit2 and Bonobo
209 *
210 * In Bonobo, components and containers must not crash if the
211 * remote end crashes. If a remote server crashes and then we
212 * try to make a CORBA call on it, we may get a SIGPIPE. So,
213 * for lack of a better solution, we ignore SIGPIPE here. This
214 * is open for reconsideration in the future.
215 *
216 * When SIGPIPE is ignored, write() calls which would
217 * ordinarily trigger a signal will instead return -1 and set
218 * errno to EPIPE. So linc will be able to catch these
219 * errors instead of letting them kill the component.
220 *
221 * Possibilities are the MSG_PEEK trick, where you test if the
222 * connection is dead right before doing the writev(). That
223 * approach has two problems:
224 *
225 * 1. There is the possibility of a race condition, where
226 * the remote end calls right after the test, and right
227 * before the writev().
228 *
229 * 2. An extra system call per write might be regarded by
230 * some as a performance hit.
231 *
232 * Another possibility is to surround the call to writev() in
233 * link_connection_writev (linc-connection.c) with something like
234 * this:
235 *
236 * link_ignore_sigpipe = 1;
237 *
238 * result = writev ( ... );
239 *
240 * link_ignore_sigpipe = 0;
241 *
242 * The SIGPIPE signal handler will check the global
243 * link_ignore_sigpipe variable and ignore the signal if it
244 * is 1. If it is 0, it can proxy to the user's original
245 * signal handler. This is a real possibility.
246 */
247 signal (SIGPIPE, SIG_IGN);
248 #endif
249
250 link_context = g_main_context_new ();
251 link_loop = g_main_loop_new (link_context, TRUE);
252
253 #ifdef LINK_SSL_SUPPORT
254 SSLeay_add_ssl_algorithms ();
255 link_ssl_method = SSLv23_method ();
256 link_ssl_ctx = SSL_CTX_new (link_ssl_method);
257 #endif
258
259 link_main_lock = link_mutex_new ();
260 link_cmd_queue_lock = link_mutex_new ();
261 if (link_is_thread_safe) {
262 link_main_cond = g_cond_new ();
263 link_cmd_queue_cond = g_cond_new ();
264 }
265
266 #ifdef HAVE_WINSOCK2_H
267 {
268 WSADATA wsadata;
269 if (WSAStartup (MAKEWORD (2, 0), &wsadata) != 0)
270 g_error ("Windows Sockets could not be initialized");
271 }
272 #endif
273 }
274
275 /**
276 * link_main_iteration:
277 * @block_for_reply: whether we should wait for a reply
278 *
279 * This routine iterates the linc mainloop, which has
280 * only the linc sources registered against it.
281 **/
282 void
link_main_iteration(gboolean block_for_reply)283 link_main_iteration (gboolean block_for_reply)
284 {
285 g_main_context_iteration (
286 link_context, block_for_reply);
287 }
288
289 /**
290 * link_main_pending:
291 *
292 * determines if the linc mainloop has any pending work to process.
293 *
294 * Return value: TRUE if the linc mainloop has any pending work to process.
295 **/
296 gboolean
link_main_pending(void)297 link_main_pending (void)
298 {
299 return g_main_context_pending (link_context);
300 }
301
302 /**
303 * link_main_loop_run:
304 *
305 * Runs the linc mainloop; blocking until the loop is exited.
306 **/
307 void
link_main_loop_run(void)308 link_main_loop_run (void)
309 {
310 g_main_loop_run (link_loop);
311 }
312
313 /**
314 * link_mutex_new:
315 *
316 * Creates a mutex, iff threads are supported, initialized etc.
317 *
318 * Return value: a new GMutex, or NULL if one is not required.
319 **/
320 GMutex *
link_mutex_new(void)321 link_mutex_new (void)
322 {
323 if (link_is_thread_safe)
324 return g_mutex_new ();
325 else
326 return NULL;
327 }
328
329 gboolean
link_in_io_thread(void)330 link_in_io_thread (void)
331 {
332 return (!link_io_thread ||
333 g_thread_self() == link_io_thread);
334 }
335
336 GMainContext *
link_main_get_context(void)337 link_main_get_context (void)
338 {
339 return link_context;
340 }
341
342 /*
343 * This method is unreliable, and for use
344 * only for debugging.
345 */
346 gboolean
link_mutex_is_locked(GMutex * lock)347 link_mutex_is_locked (GMutex *lock)
348 {
349 #ifdef __GLIBC__
350 gboolean result = TRUE;
351
352 if (lock && g_mutex_trylock (lock)) {
353 result = FALSE;
354 g_mutex_unlock (lock);
355 }
356
357 return result;
358 #else
359 /*
360 * On at least Solaris & BSD if we link our
361 * app without -lthread, and pull in ORBit2
362 * with threading enabled, we get NOP pthread
363 * operations. This is fine mostly, but we get
364 * bogus return values from trylock which screws
365 * our debugging.
366 */
367 d_printf ("hosed system is_lock-ing\n");
368 return TRUE;
369 #endif
370 }
371
372 void
link_shutdown(void)373 link_shutdown (void)
374 {
375 if (link_loop) /* break into the linc loop */
376 g_main_loop_quit (link_loop);
377
378 if (link_thread_loop)
379 g_main_loop_quit (link_thread_loop);
380
381 if (link_io_thread) {
382 g_thread_join (link_io_thread);
383 link_io_thread = NULL;
384 }
385 }
386
387 GMainContext *
link_thread_io_context(void)388 link_thread_io_context (void)
389 {
390 return link_thread_context;
391 }
392
393 static gpointer
link_io_thread_fn(gpointer data)394 link_io_thread_fn (gpointer data)
395 {
396 g_main_loop_run (link_thread_loop);
397
398 /* FIXME: need to be able to quit without waiting ... */
399
400 /* Asked to quit - so ...
401 * a) stop accepting inputs [ kill servers ]
402 * b) flush outgoing queued data etc. (oneways)
403 * c) unref all leakable resources.
404 */
405
406 link_connections_close ();
407
408 /* A tad of shutdown */
409 LINK_MUTEX_LOCK (link_cmd_queue_lock);
410 if (LINK_WAKEUP_WRITE >= 0) {
411 #ifdef HAVE_WINSOCK2_H
412 closesocket (LINK_WAKEUP_WRITE);
413 closesocket (LINK_WAKEUP_POLL);
414 #else
415 close (LINK_WAKEUP_WRITE);
416 close (LINK_WAKEUP_POLL);
417 #endif
418 LINK_WAKEUP_WRITE = -1;
419 LINK_WAKEUP_POLL = -1;
420 }
421 LINK_MUTEX_UNLOCK (link_cmd_queue_lock);
422
423 if (link_main_source) {
424 g_source_destroy (link_main_source);
425 g_source_unref (link_main_source);
426 link_main_source = NULL;
427 }
428
429 return NULL;
430 }
431
432 static void
link_exec_set_io_thread(gpointer data,gboolean immediate)433 link_exec_set_io_thread (gpointer data, gboolean immediate)
434 {
435 GError *error = NULL;
436 gboolean to_io_thread = TRUE;
437
438 link_lock ();
439 if (link_is_io_in_thread) {
440 link_unlock ();
441 return;
442 }
443
444 g_mutex_lock (link_cmd_queue_lock);
445
446 link_is_io_in_thread = TRUE;
447
448 link_thread_context = g_main_context_new ();
449 link_thread_loop = g_main_loop_new (link_thread_context, TRUE);
450
451 link_connections_move_io_T (to_io_thread);
452 link_servers_move_io_T (to_io_thread);
453
454 if (link_pipe (link_wakeup_fds) < 0)
455 g_error ("Can't create CORBA main-thread wakeup pipe");
456
457 link_main_source = link_source_create_watch
458 (link_thread_context, LINK_WAKEUP_POLL,
459 NULL, (G_IO_IN | G_IO_PRI),
460 link_mainloop_handle_input, NULL);
461
462 link_io_thread = g_thread_create_full
463 (link_io_thread_fn, NULL, 256 * 1024, TRUE, FALSE,
464 G_THREAD_PRIORITY_NORMAL, &error);
465
466 if (!link_io_thread || error)
467 g_error ("Failed to create linc worker thread");
468
469 g_main_loop_quit (link_loop);
470
471 g_mutex_unlock (link_cmd_queue_lock);
472 link_unlock ();
473 }
474
475 void
link_set_io_thread(gboolean io_in_thread)476 link_set_io_thread (gboolean io_in_thread)
477 {
478 LinkSyncCommand cmd = { { 0 }, 0 };
479
480 cmd.cmd.type = LINK_COMMAND_SET_IO_THREAD;
481
482 link_exec_command (&cmd.cmd);
483 }
484
485 static void
link_dispatch_command(gpointer data,gboolean immediate)486 link_dispatch_command (gpointer data, gboolean immediate)
487 {
488 LinkCommand *cmd = data;
489 switch (cmd->type) {
490 case LINK_COMMAND_SET_CONDITION:
491 link_connection_exec_set_condition (data, immediate);
492 break;
493 case LINK_COMMAND_DISCONNECT:
494 link_connection_exec_disconnect (data, immediate);
495 break;
496 case LINK_COMMAND_SET_IO_THREAD:
497 link_exec_set_io_thread (data, immediate);
498 break;
499 case LINK_COMMAND_CNX_UNREF:
500 link_connection_exec_cnx_unref (data, immediate);
501 break;
502 default:
503 g_error ("Unimplemented (%d)", cmd->type);
504 break;
505 }
506 }
507
508 void
link_lock(void)509 link_lock (void)
510 {
511 if (link_main_lock)
512 g_mutex_lock (link_main_lock);
513 }
514
515 void
link_unlock(void)516 link_unlock (void)
517 {
518 if (link_main_lock)
519 g_mutex_unlock (link_main_lock);
520 }
521
522 void
link_signal(void)523 link_signal (void)
524 {
525 if (link_is_thread_safe && link_is_io_in_thread) {
526 g_assert (link_main_cond != NULL);
527 g_assert (link_is_locked ());
528 g_cond_broadcast (link_main_cond);
529 }
530 }
531
532 void
link_wait(void)533 link_wait (void)
534 {
535 if (!(link_is_thread_safe && link_is_io_in_thread)) {
536 link_unlock ();
537 link_main_iteration (TRUE);
538 link_lock ();
539 } else {
540 g_assert (link_main_cond != NULL);
541 g_cond_wait (link_main_cond, link_main_lock);
542 }
543 }
544
545
546
547 gboolean
link_is_locked(void)548 link_is_locked (void)
549 {
550 return link_mutex_is_locked (link_main_lock);
551 }
552
553 /* Hack */
554 guint
link_io_thread_add_timeout(guint interval,GSourceFunc function,gpointer data)555 link_io_thread_add_timeout (guint interval,
556 GSourceFunc function,
557 gpointer data)
558 {
559 guint id;
560 GSource *tsrc;
561
562 if (!link_thread_safe())
563 return 0;
564
565 tsrc = g_timeout_source_new (interval);
566 g_source_set_priority (tsrc, G_PRIORITY_HIGH_IDLE);
567 g_source_set_callback (tsrc, function, data, NULL);
568 g_source_set_can_recurse (tsrc, TRUE);
569 id = g_source_attach (tsrc, link_thread_context);
570 g_source_unref (tsrc);
571
572 return id;
573 }
574
575 void
link_io_thread_remove_timeout(guint source_id)576 link_io_thread_remove_timeout (guint source_id)
577 {
578 GSource *tsrc;
579
580 if (!source_id)
581 return;
582
583 tsrc = g_main_context_find_source_by_id (link_thread_context, source_id);
584 g_source_destroy (tsrc);
585 }
586