1 /*
2    (c) Copyright 2001-2009  The world wide DirectFB Open Source Community (directfb.org)
3    (c) Copyright 2000-2004  Convergence (integrated media) GmbH
4 
5    All rights reserved.
6 
7    Written by Denis Oliver Kropp <dok@directfb.org>,
8               Andreas Hundt <andi@fischlustig.de>,
9               Sven Neumann <neo@directfb.org>,
10               Ville Syrjälä <syrjala@sci.fi> and
11               Claudio Ciccani <klan@users.sf.net>.
12 
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 2 of the License, or (at your option) any later version.
17 
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22 
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, write to the
25    Free Software Foundation, Inc., 59 Temple Place - Suite 330,
26    Boston, MA 02111-1307, USA.
27 */
28 
29 #include <config.h>
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include <unistd.h>
33 #include <signal.h>
34 #include <errno.h>
35 #include <time.h>
36 #include <fcntl.h>
37 
38 #include <sys/param.h>
39 #include <sys/types.h>
40 #include <sys/time.h>
41 #include <sys/stat.h>
42 #include <sys/mman.h>
43 #include <sys/utsname.h>
44 
45 #include <direct/clock.h>
46 #include <direct/debug.h>
47 #include <direct/direct.h>
48 #include <direct/mem.h>
49 #include <direct/messages.h>
50 #include <direct/signals.h>
51 #include <direct/thread.h>
52 #include <direct/trace.h>
53 #include <direct/util.h>
54 
55 #include <fusion/build.h>
56 #include <fusion/conf.h>
57 #include <fusion/types.h>
58 
59 #include "fusion_internal.h"
60 
61 #include <fusion/shmalloc.h>
62 
63 #include <fusion/shm/shm.h>
64 
65 
66 #if FUSION_BUILD_MULTI
67 
68 D_DEBUG_DOMAIN( Fusion_Main,          "Fusion/Main",          "Fusion - High level IPC" );
69 D_DEBUG_DOMAIN( Fusion_Main_Dispatch, "Fusion/Main/Dispatch", "Fusion - High level IPC Dispatch" );
70 
71 /**********************************************************************************************************************/
72 
73 static void                      *fusion_dispatch_loop ( DirectThread *thread,
74                                                          void         *arg );
75 
76 /**********************************************************************************************************************/
77 
78 static void                       fusion_fork_handler_prepare( void );
79 static void                       fusion_fork_handler_parent( void );
80 static void                       fusion_fork_handler_child( void );
81 
82 /**********************************************************************************************************************/
83 
84 static FusionWorld     *fusion_worlds[FUSION_MAX_WORLDS];
85 static pthread_mutex_t  fusion_worlds_lock = PTHREAD_MUTEX_INITIALIZER;
86 static pthread_once_t   fusion_init_once   = PTHREAD_ONCE_INIT;
87 
88 /**********************************************************************************************************************/
89 
90 int
_fusion_fd(const FusionWorldShared * shared)91 _fusion_fd( const FusionWorldShared *shared )
92 {
93      int          index;
94      FusionWorld *world;
95 
96      D_MAGIC_ASSERT( shared, FusionWorldShared );
97 
98      index = shared->world_index;
99 
100      D_ASSERT( index >= 0 );
101      D_ASSERT( index < FUSION_MAX_WORLDS );
102 
103      world = fusion_worlds[index];
104 
105      D_MAGIC_ASSERT( world, FusionWorld );
106 
107      return world->fusion_fd;
108 }
109 
110 FusionID
_fusion_id(const FusionWorldShared * shared)111 _fusion_id( const FusionWorldShared *shared )
112 {
113      int          index;
114      FusionWorld *world;
115 
116      D_MAGIC_ASSERT( shared, FusionWorldShared );
117 
118      index = shared->world_index;
119 
120      D_ASSERT( index >= 0 );
121      D_ASSERT( index < FUSION_MAX_WORLDS );
122 
123      world = fusion_worlds[index];
124 
125      D_MAGIC_ASSERT( world, FusionWorld );
126 
127      return world->fusion_id;
128 }
129 
130 FusionWorld *
_fusion_world(const FusionWorldShared * shared)131 _fusion_world( const FusionWorldShared *shared )
132 {
133      int          index;
134      FusionWorld *world;
135 
136      D_MAGIC_ASSERT( shared, FusionWorldShared );
137 
138      index = shared->world_index;
139 
140      D_ASSERT( index >= 0 );
141      D_ASSERT( index < FUSION_MAX_WORLDS );
142 
143      world = fusion_worlds[index];
144 
145      D_MAGIC_ASSERT( world, FusionWorld );
146 
147      return world;
148 }
149 
150 /**********************************************************************************************************************/
151 
152 static void
init_once(void)153 init_once( void )
154 {
155      struct utsname uts;
156      int            i, j, k, l;
157 
158      pthread_atfork( fusion_fork_handler_prepare, fusion_fork_handler_parent, fusion_fork_handler_child );
159 
160      if (uname( &uts ) < 0) {
161           D_PERROR( "Fusion/Init: uname() failed!\n" );
162           return;
163      }
164 
165 #if !FUSION_BUILD_KERNEL
166      D_INFO( "Fusion/Init: "
167              "Builtin Implementation is still experimental! Crash/Deadlocks might occur!\n" );
168 #endif
169 
170      if (fusion_config->madv_remove_force) {
171           if (fusion_config->madv_remove)
172                D_INFO( "Fusion/SHM: Using MADV_REMOVE (forced)\n" );
173           else
174                D_INFO( "Fusion/SHM: Not using MADV_REMOVE (forced)!\n" );
175      }
176      else {
177           switch (sscanf( uts.release, "%d.%d.%d.%d", &i, &j, &k, &l )) {
178                case 3:
179                     l = 0;
180                case 4:
181                     if (((i << 24) | (j << 16) | (k << 8) | l) >= 0x02061302)
182                          fusion_config->madv_remove = true;
183                     break;
184 
185                default:
186                     D_WARN( "could not parse kernel version '%s'", uts.release );
187           }
188 
189           if (fusion_config->madv_remove)
190                D_INFO( "Fusion/SHM: Using MADV_REMOVE (%d.%d.%d.%d >= 2.6.19.2)\n", i, j, k, l );
191           else
192                D_INFO( "Fusion/SHM: NOT using MADV_REMOVE (%d.%d.%d.%d < 2.6.19.2)! [0x%08x]\n",
193                        i, j, k, l, (i << 24) | (j << 16) | (k << 8) | l );
194      }
195 }
196 
197 /**********************************************************************************************************************/
198 
199 #if FUSION_BUILD_KERNEL
200 
201 static void
fusion_world_fork(FusionWorld * world)202 fusion_world_fork( FusionWorld *world )
203 {
204      int                fd;
205      char               buf1[20];
206      char               buf2[20];
207      FusionEnter        enter;
208      FusionFork         fork;
209      FusionWorldShared *shared;
210 
211      D_MAGIC_ASSERT( world, FusionWorld );
212 
213      shared = world->shared;
214 
215      D_MAGIC_ASSERT( shared, FusionWorldShared );
216 
217      snprintf( buf1, sizeof(buf1), "/dev/fusion%d", shared->world_index );
218      snprintf( buf2, sizeof(buf2), "/dev/fusion/%d", shared->world_index );
219 
220      /* Open Fusion Kernel Device. */
221      fd = direct_try_open( buf1, buf2, O_RDWR | O_NONBLOCK, true );
222      if (fd < 0) {
223           D_PERROR( "Fusion/Main: Reopening fusion device (world %d) failed!\n", shared->world_index );
224           raise(5);
225      }
226 
227      /* Drop "identity" when running another program. */
228      if (fcntl( fd, F_SETFD, FD_CLOEXEC ) < 0)
229           D_PERROR( "Fusion/Init: Setting FD_CLOEXEC flag failed!\n" );
230 
231      /* Fill enter information. */
232      enter.api.major = FUSION_API_MAJOR_REQUIRED;
233      enter.api.minor = FUSION_API_MINOR_REQUIRED;
234      enter.fusion_id = 0;     /* Clear for check below. */
235 
236      /* Enter the fusion world. */
237      while (ioctl( fd, FUSION_ENTER, &enter )) {
238           if (errno != EINTR) {
239                D_PERROR( "Fusion/Init: Could not reenter world '%d'!\n", shared->world_index );
240                raise(5);
241           }
242      }
243 
244      /* Check for valid Fusion ID. */
245      if (!enter.fusion_id) {
246           D_ERROR( "Fusion/Init: Got no ID from FUSION_ENTER! Kernel module might be too old.\n" );
247           raise(5);
248      }
249 
250      D_DEBUG_AT( Fusion_Main, "  -> Fusion ID 0x%08lx\n", enter.fusion_id );
251 
252 
253      /* Fill fork information. */
254      fork.fusion_id = world->fusion_id;
255 
256      fusion_world_flush_calls( world, 1 );
257 
258      /* Fork within the fusion world. */
259      while (ioctl( fd, FUSION_FORK, &fork )) {
260           if (errno != EINTR) {
261                D_PERROR( "Fusion/Main: Could not fork in world '%d'!\n", shared->world_index );
262                raise(5);
263           }
264      }
265 
266      D_DEBUG_AT( Fusion_Main, "  -> Fusion ID 0x%08lx\n", fork.fusion_id );
267 
268      /* Get new fusion id back. */
269      world->fusion_id = fork.fusion_id;
270 
271      /* Close old file descriptor. */
272      close( world->fusion_fd );
273 
274      /* Write back new file descriptor. */
275      world->fusion_fd = fd;
276 
277 
278      D_DEBUG_AT( Fusion_Main, "  -> restarting dispatcher loop...\n" );
279 
280      /* Restart the dispatcher thread. FIXME: free old struct */
281      world->dispatch_loop = direct_thread_create( DTT_MESSAGING,
282                                                   fusion_dispatch_loop,
283                                                   world, "Fusion Dispatch" );
284      if (!world->dispatch_loop)
285           raise(5);
286 }
287 
288 static void
fusion_fork_handler_prepare(void)289 fusion_fork_handler_prepare( void )
290 {
291      int i;
292 
293      D_DEBUG_AT( Fusion_Main, "%s()\n", __FUNCTION__ );
294 
295      for (i=0; i<FUSION_MAX_WORLDS; i++) {
296           FusionWorld *world = fusion_worlds[i];
297 
298           if (!world)
299                continue;
300 
301           D_MAGIC_ASSERT( world, FusionWorld );
302 
303           if (world->fork_callback)
304                world->fork_callback( world->fork_action, FFS_PREPARE );
305      }
306 }
307 
308 static void
fusion_fork_handler_parent(void)309 fusion_fork_handler_parent( void )
310 {
311      int i;
312 
313      D_DEBUG_AT( Fusion_Main, "%s()\n", __FUNCTION__ );
314 
315      for (i=0; i<FUSION_MAX_WORLDS; i++) {
316           FusionWorld       *world = fusion_worlds[i];
317           FusionWorldShared *shared;
318 
319           if (!world)
320                continue;
321 
322           D_MAGIC_ASSERT( world, FusionWorld );
323 
324           shared = world->shared;
325 
326           D_MAGIC_ASSERT( shared, FusionWorldShared );
327 
328           if (world->fork_callback)
329                world->fork_callback( world->fork_action, FFS_PARENT );
330 
331           if (world->fork_action == FFA_FORK) {
332                /* Increase the shared reference counter. */
333                if (fusion_master( world ))
334                     shared->refs++;
335           }
336      }
337 }
338 
339 static void
fusion_fork_handler_child(void)340 fusion_fork_handler_child( void )
341 {
342      int i;
343 
344      D_DEBUG_AT( Fusion_Main, "%s()\n", __FUNCTION__ );
345 
346      for (i=0; i<FUSION_MAX_WORLDS; i++) {
347           FusionWorld *world = fusion_worlds[i];
348 
349           if (!world)
350                continue;
351 
352           D_MAGIC_ASSERT( world, FusionWorld );
353 
354           if (world->fork_callback)
355                world->fork_callback( world->fork_action, FFS_CHILD );
356 
357           switch (world->fork_action) {
358                default:
359                     D_BUG( "unknown fork action %d", world->fork_action );
360 
361                case FFA_CLOSE:
362                     D_DEBUG_AT( Fusion_Main, "  -> closing world %d\n", i );
363 
364                     /* Remove world from global list. */
365                     fusion_worlds[i] = NULL;
366 
367                     /* Unmap shared area. */
368                     munmap( world->shared, sizeof(FusionWorldShared) );
369 
370                     /* Close Fusion Kernel Device. */
371                     close( world->fusion_fd );
372 
373                     /* Free local world data. */
374                     D_MAGIC_CLEAR( world );
375                     D_FREE( world );
376 
377                     break;
378 
379                case FFA_FORK:
380                     D_DEBUG_AT( Fusion_Main, "  -> forking in world %d\n", i );
381 
382                     fusion_world_fork( world );
383 
384                     break;
385           }
386      }
387 }
388 
389 /**********************************************************************************************************************/
390 
391 static DirectResult
map_shared_root(void * shm_base,int world_index,bool master,FusionWorldShared ** ret_shared)392 map_shared_root( void               *shm_base,
393                  int                 world_index,
394                  bool                master,
395                  FusionWorldShared **ret_shared )
396 {
397      DirectResult ret = DR_OK;
398      int          fd;
399      void        *map;
400      char         tmpfs[FUSION_SHM_TMPFS_PATH_NAME_LEN];
401      char         root_file[FUSION_SHM_TMPFS_PATH_NAME_LEN+32];
402      int          flags = O_RDONLY;
403      int          prot  = PROT_READ;
404 
405      if (master || !fusion_config->secure_fusion) {
406           prot  |= PROT_WRITE;
407           flags  = O_RDWR;
408      }
409 
410      if (master)
411           flags |= O_CREAT | O_TRUNC;
412 
413      if (fusion_config->tmpfs) {
414           direct_snputs( tmpfs, fusion_config->tmpfs, FUSION_SHM_TMPFS_PATH_NAME_LEN );
415      }
416      else if (!fusion_find_tmpfs( tmpfs, FUSION_SHM_TMPFS_PATH_NAME_LEN )) {
417           D_ERROR( "Fusion/SHM: Could not find tmpfs mount point, falling back to /dev/shm!\n" );
418           direct_snputs( tmpfs, "/dev/shm", FUSION_SHM_TMPFS_PATH_NAME_LEN );
419      }
420 
421      snprintf( root_file, sizeof(root_file), "%s/fusion.%d", tmpfs, world_index );
422 
423      /* open the virtual file */
424      fd = open( root_file, flags, 0660 );
425      if (fd < 0) {
426           ret = errno2result(errno);
427           D_PERROR( "Fusion/SHM: Could not open shared memory file '%s'!\n", root_file );
428           return ret;
429      }
430 
431      if (fusion_config->shmfile_gid != (gid_t)-1) {
432           /* chgrp the SH_FILE dev entry */
433           if (fchown( fd, -1, fusion_config->shmfile_gid ) != 0)
434                D_WARN( "Fusion/SHM: Changing owner on %s failed... continuing on.", root_file );
435      }
436 
437      if (master) {
438           fchmod( fd, fusion_config->secure_fusion ? 0640 : 0660 );
439           ftruncate( fd, sizeof(FusionWorldShared) );
440      }
441 
442      D_DEBUG_AT( Fusion_Main, "  -> mmaping shared memory file... (%zu bytes)\n", sizeof(FusionWorldShared) );
443 
444 
445 
446      /* Map shared area. */
447      map = mmap( shm_base + 0x10000 * world_index, sizeof(FusionWorldShared),
448                  prot, MAP_FIXED | MAP_SHARED, fd, 0 );
449      if (map == MAP_FAILED) {
450           ret = errno2result(errno);
451           D_PERROR( "Fusion/Init: Mapping shared area failed!\n" );
452           goto out;
453      }
454 
455      *ret_shared = map;
456 
457 
458 out:
459      close( fd );
460 
461      return ret;
462 }
463 
464 /**********************************************************************************************************************/
465 
466 /*
467  * Enters a fusion world by joining or creating it.
468  *
469  * If <b>world</b> is negative, the next free index is used to create a new world.
470  * Otherwise the world with the specified index is joined or created.
471  */
472 DirectResult
fusion_enter(int world_index,int abi_version,FusionEnterRole role,FusionWorld ** ret_world)473 fusion_enter( int               world_index,
474               int               abi_version,
475               FusionEnterRole   role,
476               FusionWorld     **ret_world )
477 {
478      DirectResult       ret    = DR_OK;
479      int                fd     = -1;
480      FusionWorld       *world  = NULL;
481      FusionWorldShared *shared = NULL;
482      FusionEnter        enter;
483      char               buf1[20];
484      char               buf2[20];
485 
486      D_DEBUG_AT( Fusion_Main, "%s( %d, %d, %p )\n", __FUNCTION__, world_index, abi_version, ret_world );
487 
488      D_ASSERT( ret_world != NULL );
489 
490      if (world_index >= FUSION_MAX_WORLDS) {
491           D_ERROR( "Fusion/Init: World index %d exceeds maximum index %d!\n", world_index, FUSION_MAX_WORLDS - 1 );
492           return DR_INVARG;
493      }
494 
495      pthread_once( &fusion_init_once, init_once );
496 
497 
498      if (fusion_config->force_slave)
499           role = FER_SLAVE;
500 
501      direct_initialize();
502 
503      pthread_mutex_lock( &fusion_worlds_lock );
504 
505 
506      if (world_index < 0) {
507           if (role == FER_SLAVE) {
508                D_ERROR( "Fusion/Init: Slave role and a new world (index -1) was requested!\n" );
509                pthread_mutex_unlock( &fusion_worlds_lock );
510                return DR_INVARG;
511           }
512 
513           for (world_index=0; world_index<FUSION_MAX_WORLDS; world_index++) {
514                world = fusion_worlds[world_index];
515                if (world)
516                     break;
517 
518                snprintf( buf1, sizeof(buf1), "/dev/fusion%d", world_index );
519                snprintf( buf2, sizeof(buf2), "/dev/fusion/%d", world_index );
520 
521                /* Open Fusion Kernel Device. */
522                fd = direct_try_open( buf1, buf2, O_RDWR | O_NONBLOCK | O_EXCL, false );
523                if (fd < 0) {
524                     if (errno != EBUSY)
525                          D_PERROR( "Fusion/Init: Error opening '%s' and/or '%s'!\n", buf1, buf2 );
526                }
527                else
528                     break;
529           }
530      }
531      else {
532           world = fusion_worlds[world_index];
533           if (!world) {
534                int flags = O_RDWR | O_NONBLOCK;
535 
536                snprintf( buf1, sizeof(buf1), "/dev/fusion%d", world_index );
537                snprintf( buf2, sizeof(buf2), "/dev/fusion/%d", world_index );
538 
539                if (role == FER_MASTER)
540                     flags |= O_EXCL;
541                else if (role == FER_SLAVE)
542                     flags |= O_APPEND;
543 
544                /* Open Fusion Kernel Device. */
545                fd = direct_try_open( buf1, buf2, flags, true );
546           }
547      }
548 
549      /* Enter a world again? */
550      if (world) {
551           D_MAGIC_ASSERT( world, FusionWorld );
552           D_ASSERT( world->refs > 0 );
553 
554           /* Check the role again. */
555           switch (role) {
556                case FER_MASTER:
557                     if (world->fusion_id != FUSION_ID_MASTER) {
558                          D_ERROR( "Fusion/Init: Master role requested for a world (%d) "
559                                   "we're already slave in!\n", world_index );
560                          ret = DR_UNSUPPORTED;
561                          goto error;
562                     }
563                     break;
564 
565                case FER_SLAVE:
566                     if (world->fusion_id == FUSION_ID_MASTER) {
567                          D_ERROR( "Fusion/Init: Slave role requested for a world (%d) "
568                                   "we're already master in!\n", world_index );
569                          ret = DR_UNSUPPORTED;
570                          goto error;
571                     }
572                     break;
573 
574                case FER_ANY:
575                     break;
576           }
577 
578           shared = world->shared;
579 
580           D_MAGIC_ASSERT( shared, FusionWorldShared );
581 
582           if (shared->world_abi != abi_version) {
583                D_ERROR( "Fusion/Init: World ABI (%d) of world '%d' doesn't match own (%d)!\n",
584                         shared->world_abi, world_index, abi_version );
585                ret = DR_VERSIONMISMATCH;
586                goto error;
587           }
588 
589           world->refs++;
590 
591           pthread_mutex_unlock( &fusion_worlds_lock );
592 
593           D_DEBUG_AT( Fusion_Main, "  -> using existing world %p [%d]\n", world, world_index );
594 
595           /* Return the world. */
596           *ret_world = world;
597 
598           return DR_OK;
599      }
600 
601      if (fd < 0) {
602           D_PERROR( "Fusion/Init: Opening fusion device (world %d) as '%s' failed!\n", world_index,
603                     role == FER_ANY ? "any" : (role == FER_MASTER ? "master" : "slave")  );
604           ret = DR_INIT;
605           goto error;
606      }
607 
608      /* Drop "identity" when running another program. */
609      if (fcntl( fd, F_SETFD, FD_CLOEXEC ) < 0)
610           D_PERROR( "Fusion/Init: Setting FD_CLOEXEC flag failed!\n" );
611 
612      /* Fill enter information. */
613      enter.api.major = FUSION_API_MAJOR_REQUIRED;
614      enter.api.minor = FUSION_API_MINOR_REQUIRED;
615      enter.fusion_id = 0;     /* Clear for check below. */
616      enter.secure    = fusion_config->secure_fusion;
617 
618      /* Enter the fusion world. */
619      while (ioctl( fd, FUSION_ENTER, &enter )) {
620           if (errno != EINTR) {
621                D_PERROR( "Fusion/Init: Could not enter world '%d'!\n", world_index );
622                ret = DR_INIT;
623                goto error;
624           }
625      }
626 
627      /* Check for valid Fusion ID. */
628      if (!enter.fusion_id) {
629           D_ERROR( "Fusion/Init: Got no ID from FUSION_ENTER! Kernel module might be too old.\n" );
630           ret = DR_INIT;
631           goto error;
632      }
633 
634      D_DEBUG_AT( Fusion_Main, "  -> Fusion ID 0x%08lx\n", enter.fusion_id );
635 
636      /* Check slave role only, master is handled by O_EXCL earlier. */
637      if (role == FER_SLAVE && enter.fusion_id == FUSION_ID_MASTER) {
638           D_PERROR( "Fusion/Init: Entering world '%d' as a slave failed!\n", world_index );
639           ret = DR_UNSUPPORTED;
640           goto error;
641      }
642 
643 
644      unsigned long shm_base;
645 
646      if (ioctl( fd, FUSION_SHM_GET_BASE, &shm_base )) {
647           ret = errno2result(errno);
648           D_PERROR( "Fusion/Init: FUSION_SHM_GET_BASE failed!\n" );
649           goto error;
650      }
651 
652 
653      /* Map shared area. */
654      ret = map_shared_root( (void*) shm_base, world_index, enter.fusion_id == FUSION_ID_MASTER, &shared );
655      if (ret)
656           goto error;
657 
658      D_DEBUG_AT( Fusion_Main, "  -> shared area at %p, size %zu\n", shared, sizeof(FusionWorldShared) );
659 
660      /* Initialize shared data. */
661      if (enter.fusion_id == FUSION_ID_MASTER) {
662           /* Initialize reference counter. */
663           shared->refs = 1;
664 
665           /* Set ABI version. */
666           shared->world_abi = abi_version;
667 
668           /* Set the world index. */
669           shared->world_index = world_index;
670 
671           /* Set start time of world clock. */
672           gettimeofday( &shared->start_time, NULL );
673 
674           D_MAGIC_SET( shared, FusionWorldShared );
675      }
676      else {
677           D_MAGIC_ASSERT( shared, FusionWorldShared );
678 
679           /* Check ABI version. */
680           if (shared->world_abi != abi_version) {
681                D_ERROR( "Fusion/Init: World ABI (%d) doesn't match own (%d)!\n",
682                         shared->world_abi, abi_version );
683                ret = DR_VERSIONMISMATCH;
684                goto error;
685           }
686      }
687 
688      /* Synchronize to world clock. */
689      direct_clock_set_start( &shared->start_time );
690 
691 
692      /* Allocate local data. */
693      world = D_CALLOC( 1, sizeof(FusionWorld) );
694      if (!world) {
695           ret = D_OOM();
696           goto error;
697      }
698 
699      /* Initialize local data. */
700      world->refs      = 1;
701      world->shared    = shared;
702      world->fusion_fd = fd;
703      world->fusion_id = enter.fusion_id;
704 
705      direct_mutex_init( &world->reactor_nodes_lock );
706      direct_mutex_init( &world->bins_lock );
707 
708      D_MAGIC_SET( world, FusionWorld );
709 
710      fusion_worlds[world_index] = world;
711 
712      /* Initialize shared memory part. */
713      ret = fusion_shm_init( world );
714      if (ret)
715           goto error2;
716 
717 
718      D_DEBUG_AT( Fusion_Main, "  -> initializing other parts...\n" );
719 
720      /* Initialize other parts. */
721      if (enter.fusion_id == FUSION_ID_MASTER) {
722           fusion_skirmish_init( &shared->arenas_lock, "Fusion Arenas", world );
723           fusion_skirmish_init( &shared->reactor_globals, "Fusion Reactor Globals", world );
724 
725           fusion_skirmish_add_permissions( &shared->arenas_lock, 0, FUSION_SKIRMISH_PERMIT_PREVAIL | FUSION_SKIRMISH_PERMIT_DISMISS );
726           fusion_skirmish_add_permissions( &shared->reactor_globals, 0, FUSION_SKIRMISH_PERMIT_PREVAIL | FUSION_SKIRMISH_PERMIT_DISMISS );
727 
728           /* Create the main pool. */
729           ret = fusion_shm_pool_create( world, "Fusion Main Pool", 0x1000000,
730                                         fusion_config->debugshm, &shared->main_pool );
731           if (ret)
732                goto error3;
733      }
734 
735 
736      D_DEBUG_AT( Fusion_Main, "  -> starting dispatcher loop...\n" );
737 
738      /* Start the dispatcher thread. */
739      world->dispatch_loop = direct_thread_create( DTT_MESSAGING,
740                                                   fusion_dispatch_loop,
741                                                   world, "Fusion Dispatch" );
742      if (!world->dispatch_loop) {
743           ret = DR_FAILURE;
744           goto error4;
745      }
746 
747 
748      /* Let others enter the world. */
749      if (enter.fusion_id == FUSION_ID_MASTER) {
750           D_DEBUG_AT( Fusion_Main, "  -> unblocking world...\n" );
751 
752           while (ioctl( fd, FUSION_UNBLOCK )) {
753                if (errno != EINTR) {
754                     D_PERROR( "Fusion/Init: Could not unblock world!\n" );
755                     ret = DR_FUSION;
756                     goto error4;
757                }
758           }
759      }
760 
761      D_DEBUG_AT( Fusion_Main, "  -> done. (%p)\n", world );
762 
763      pthread_mutex_unlock( &fusion_worlds_lock );
764 
765      /* Return the fusion world. */
766      *ret_world = world;
767 
768      return DR_OK;
769 
770 
771 error4:
772      if (world->dispatch_loop)
773           direct_thread_destroy( world->dispatch_loop );
774 
775      if (enter.fusion_id == FUSION_ID_MASTER)
776           fusion_shm_pool_destroy( world, shared->main_pool );
777 
778 error3:
779      if (enter.fusion_id == FUSION_ID_MASTER) {
780           fusion_skirmish_destroy( &shared->arenas_lock );
781           fusion_skirmish_destroy( &shared->reactor_globals );
782      }
783 
784      fusion_shm_deinit( world );
785 
786 
787 error2:
788      fusion_worlds[world_index] = world;
789 
790      D_MAGIC_CLEAR( world );
791 
792      D_FREE( world );
793 
794 error:
795      if (shared && shared != MAP_FAILED) {
796           if (enter.fusion_id == FUSION_ID_MASTER)
797                D_MAGIC_CLEAR( shared );
798 
799           munmap( shared, sizeof(FusionWorldShared) );
800      }
801 
802      if (fd != -1)
803           close( fd );
804 
805      pthread_mutex_unlock( &fusion_worlds_lock );
806 
807      direct_shutdown();
808 
809      return ret;
810 }
811 
812 DirectResult
fusion_stop_dispatcher(FusionWorld * world,bool emergency)813 fusion_stop_dispatcher( FusionWorld *world,
814                         bool         emergency )
815 {
816      if (!emergency) {
817           fusion_sync( world );
818 
819           direct_thread_lock( world->dispatch_loop );
820      }
821 
822      world->dispatch_stop = true;
823 
824      if (!emergency) {
825           direct_thread_unlock( world->dispatch_loop );
826 
827           fusion_sync( world );
828      }
829 
830      return DR_OK;
831 }
832 
833 /*
834  * Exits the fusion world.
835  *
836  * If 'emergency' is true the function won't join but kill the dispatcher thread.
837  */
838 DirectResult
fusion_exit(FusionWorld * world,bool emergency)839 fusion_exit( FusionWorld *world,
840              bool         emergency )
841 {
842      FusionWorldShared *shared;
843 
844      D_DEBUG_AT( Fusion_Main, "%s( %p, %semergency )\n", __FUNCTION__, world, emergency ? "" : "no " );
845 
846      D_MAGIC_ASSERT( world, FusionWorld );
847 
848      shared = world->shared;
849 
850      D_MAGIC_ASSERT( shared, FusionWorldShared );
851 
852 
853      pthread_mutex_lock( &fusion_worlds_lock );
854 
855      D_ASSERT( world->refs > 0 );
856 
857      if (--world->refs) {
858           pthread_mutex_unlock( &fusion_worlds_lock );
859           return DR_OK;
860      }
861 
862      if (!emergency) {
863           int               foo;
864           FusionSendMessage msg;
865 
866           /* Wake up the read loop thread. */
867           msg.fusion_id = world->fusion_id;
868           msg.msg_id    = 0;
869           msg.msg_data  = &foo;
870           msg.msg_size  = sizeof(foo);
871 
872           fusion_world_flush_calls( world, 1 );
873 
874           while (ioctl( world->fusion_fd, FUSION_SEND_MESSAGE, &msg ) < 0) {
875                if (errno != EINTR) {
876                     D_PERROR( "FUSION_SEND_MESSAGE" );
877                     direct_thread_cancel( world->dispatch_loop );
878                     break;
879                }
880           }
881 
882           /* Wait for its termination. */
883           direct_thread_join( world->dispatch_loop );
884      }
885 
886      direct_thread_destroy( world->dispatch_loop );
887 
888      /* Master has to deinitialize shared data. */
889      if (fusion_master( world )) {
890           shared->refs--;
891           if (shared->refs == 0) {
892                fusion_skirmish_destroy( &shared->reactor_globals );
893                fusion_skirmish_destroy( &shared->arenas_lock );
894 
895                fusion_shm_pool_destroy( world, shared->main_pool );
896 
897                /* Deinitialize shared memory. */
898                fusion_shm_deinit( world );
899           }
900      }
901      else {
902           /* Leave shared memory. */
903           fusion_shm_deinit( world );
904      }
905 
906      /* Reset local dispatch nodes. */
907      _fusion_reactor_free_all( world );
908 
909 
910      /* Remove world from global list. */
911      fusion_worlds[shared->world_index] = NULL;
912 
913 
914      /* Unmap shared area. */
915      if (fusion_master( world ) && shared->refs == 0) {
916           char         tmpfs[FUSION_SHM_TMPFS_PATH_NAME_LEN];
917           char         root_file[FUSION_SHM_TMPFS_PATH_NAME_LEN+32];
918 
919           if (fusion_config->tmpfs)
920                direct_snputs( tmpfs, fusion_config->tmpfs, FUSION_SHM_TMPFS_PATH_NAME_LEN );
921           else if (!fusion_find_tmpfs( tmpfs, FUSION_SHM_TMPFS_PATH_NAME_LEN ))
922                direct_snputs( tmpfs, "/dev/shm", FUSION_SHM_TMPFS_PATH_NAME_LEN );
923 
924           snprintf( root_file, sizeof(root_file), "%s/fusion.%d", tmpfs, shared->world_index );
925 
926           if (unlink( root_file ) < 0)
927                D_PERROR( "Fusion/Main: could not unlink shared memory file (%s)!\n", root_file );
928 
929           D_MAGIC_CLEAR( shared );
930      }
931 
932      munmap( shared, sizeof(FusionWorldShared) );
933 
934 
935      /* Close Fusion Kernel Device. */
936      close( world->fusion_fd );
937 
938 
939      /* Free local world data. */
940      D_MAGIC_CLEAR( world );
941 
942      D_FREE( world );
943 
944 
945      pthread_mutex_unlock( &fusion_worlds_lock );
946 
947      direct_shutdown();
948 
949      return DR_OK;
950 }
951 
952 /*
953  * Sends a signal to one or more fusionees and optionally waits
954  * for their processes to terminate.
955  *
956  * A fusion_id of zero means all fusionees but the calling one.
957  * A timeout of zero means infinite waiting while a negative value
958  * means no waiting at all.
959  */
960 DirectResult
fusion_kill(FusionWorld * world,FusionID fusion_id,int signal,int timeout_ms)961 fusion_kill( FusionWorld *world,
962              FusionID     fusion_id,
963              int          signal,
964              int          timeout_ms )
965 {
966      FusionKill param;
967 
968      D_MAGIC_ASSERT( world, FusionWorld );
969 
970      param.fusion_id  = fusion_id;
971      param.signal     = signal;
972      param.timeout_ms = timeout_ms;
973 
974      fusion_world_flush_calls( world, 1 );
975 
976      while (ioctl( world->fusion_fd, FUSION_KILL, &param )) {
977           switch (errno) {
978                case EINTR:
979                     continue;
980                case ETIMEDOUT:
981                     return DR_TIMEOUT;
982                default:
983                     break;
984           }
985 
986           D_PERROR ("FUSION_KILL");
987 
988           return DR_FAILURE;
989      }
990 
991      return DR_OK;
992 }
993 
994 const char *
fusion_get_tmpfs(FusionWorld * world)995 fusion_get_tmpfs( FusionWorld *world )
996 {
997      D_MAGIC_ASSERT( world, FusionWorld );
998      D_MAGIC_ASSERT( world->shared, FusionWorldShared );
999 
1000      return world->shared->shm.tmpfs;
1001 }
1002 
1003 /**********************************************************************************************************************/
1004 
1005 static void *
fusion_dispatch_loop(DirectThread * thread,void * arg)1006 fusion_dispatch_loop( DirectThread *thread, void *arg )
1007 {
1008      int          len = 0;
1009      int          result;
1010      char         buf[FUSION_MESSAGE_SIZE];
1011      FusionWorld *world = arg;
1012 
1013      D_DEBUG_AT( Fusion_Main_Dispatch, "%s() running...\n", __FUNCTION__ );
1014 
1015      while (true) {
1016           char *buf_p = buf;
1017 
1018           D_MAGIC_ASSERT( world, FusionWorld );
1019 
1020           len = read( world->fusion_fd, buf, FUSION_MESSAGE_SIZE );
1021           if (len < 0)
1022                break;
1023 
1024           D_DEBUG_AT( Fusion_Main_Dispatch, "  -> got %d bytes...\n", len );
1025 
1026           direct_thread_lock( world->dispatch_loop );
1027 
1028           if (world->dispatch_stop) {
1029                D_DEBUG_AT( Fusion_Main_Dispatch, "  -> IGNORING (dispatch_stop!)\n" );
1030           }
1031           else {
1032                while (buf_p < buf + len) {
1033                     FusionReadMessage *header = (FusionReadMessage*) buf_p;
1034                     void              *data   = buf_p + sizeof(FusionReadMessage);
1035 
1036                     if (world->dispatch_stop) {
1037                          D_DEBUG_AT( Fusion_Main_Dispatch, "  -> ABORTING (dispatch_stop!)\n" );
1038                          break;
1039                     }
1040 
1041                     D_MAGIC_ASSERT( world, FusionWorld );
1042                     D_ASSERT( (buf + len - buf_p) >= sizeof(FusionReadMessage) );
1043 
1044                     switch (header->msg_type) {
1045                          case FMT_SEND:
1046                               D_DEBUG_AT( Fusion_Main_Dispatch, "  -> FMT_SEND!\n" );
1047                               break;
1048                          case FMT_CALL:
1049                               D_DEBUG_AT( Fusion_Main_Dispatch, "  -> FMT_CALL...\n" );
1050                               _fusion_call_process( world, header->msg_id, data,
1051                                                     (header->msg_size != sizeof(FusionCallMessage))
1052                                                     ? data + sizeof(FusionCallMessage) : NULL );
1053                               break;
1054                          case FMT_REACTOR:
1055                               D_DEBUG_AT( Fusion_Main_Dispatch, "  -> FMT_REACTOR...\n" );
1056                               _fusion_reactor_process_message( world, header->msg_id, header->msg_channel, data );
1057                               break;
1058                          case FMT_SHMPOOL:
1059                               D_DEBUG_AT( Fusion_Main_Dispatch, "  -> FMT_SHMPOOL...\n" );
1060                               _fusion_shmpool_process( world, header->msg_id, data );
1061                               break;
1062                          case FMT_CALL3:
1063                               D_DEBUG_AT( Fusion_Main_Dispatch, "  -> FMT_CALL3...\n" );
1064                               _fusion_call_process3( world, header->msg_id, data,
1065                                                      (header->msg_size != sizeof(FusionCallMessage3))
1066                                                      ?data + sizeof(FusionCallMessage3) : NULL );
1067                               break;
1068                          case FMT_LEAVE:
1069                               D_DEBUG_AT( Fusion_Main_Dispatch, "  -> FMT_LEAVE...\n" );
1070 
1071                               if (world->leave_callback)
1072                                    world->leave_callback( world, *((FusionID*)data), world->leave_ctx );
1073                               break;
1074                          default:
1075                               D_DEBUG( "Fusion/Receiver: discarding message of unknown type '%d'\n",
1076                                        header->msg_type );
1077                               break;
1078                     }
1079 
1080                     buf_p = data + ((header->msg_size + 3) & ~3);
1081                }
1082           }
1083 
1084           direct_thread_unlock( world->dispatch_loop );
1085 
1086           if (!world->refs) {
1087                D_DEBUG_AT( Fusion_Main_Dispatch, "  -> good bye!\n" );
1088                return NULL;
1089           }
1090      }
1091 
1092      return NULL;
1093 }
1094 
1095 /**********************************************************************************************************************/
1096 
1097 DirectResult
fusion_get_fusionee_path(const FusionWorld * world,FusionID fusion_id,char * buf,size_t buf_size,size_t * ret_size)1098 fusion_get_fusionee_path( const FusionWorld *world,
1099                           FusionID           fusion_id,
1100                           char              *buf,
1101                           size_t             buf_size,
1102                           size_t            *ret_size )
1103 {
1104      FusionGetFusioneeInfo info;
1105      size_t                len;
1106 
1107      D_ASSERT( world != NULL );
1108      D_ASSERT( buf != NULL );
1109      D_ASSERT( ret_size != NULL );
1110 
1111      info.fusion_id = fusion_id;
1112 
1113      while (ioctl( world->fusion_fd, FUSION_GET_FUSIONEE_INFO, &info ) < 0) {
1114           switch (errno) {
1115                case EINTR:
1116                     continue;
1117 
1118                default:
1119                     break;
1120           }
1121 
1122           D_PERROR( "Fusion: FUSION_GET_FUSIONEE_INFO failed!\n" );
1123 
1124           return DR_FUSION;
1125      }
1126 
1127      len = strlen( info.exe_file ) + 1;
1128 
1129      if (len > buf_size) {
1130           *ret_size = len;
1131           return DR_LIMITEXCEEDED;
1132      }
1133 
1134      memcpy( buf, info.exe_file, len );
1135 
1136      *ret_size = len;
1137 
1138      return DR_OK;
1139 }
1140 
1141 #else /* FUSION_BUILD_KERNEL */
1142 
1143 #include <dirent.h>
1144 
1145 #include <direct/system.h>
1146 
1147 typedef struct {
1148      DirectLink   link;
1149 
1150      FusionRef   *ref;
1151 
1152      int          count;
1153 } __FusioneeRef;
1154 
1155 typedef struct {
1156      DirectLink   link;
1157 
1158      FusionID     id;
1159      pid_t        pid;
1160 
1161      DirectLink  *refs;
1162 } __Fusionee;
1163 
1164 
1165 /**********************************************************************************************************************/
1166 
1167 static DirectResult
_fusion_add_fusionee(FusionWorld * world,FusionID fusion_id)1168 _fusion_add_fusionee( FusionWorld *world, FusionID fusion_id )
1169 {
1170      DirectResult       ret;
1171      FusionWorldShared *shared;
1172      __Fusionee        *fusionee;
1173 
1174      D_DEBUG_AT( Fusion_Main, "%s( %p, %lu )\n", __FUNCTION__, world, fusion_id );
1175 
1176      D_MAGIC_ASSERT( world, FusionWorld );
1177 
1178      shared = world->shared;
1179 
1180      D_MAGIC_ASSERT( shared, FusionWorldShared );
1181 
1182      fusionee = SHCALLOC( shared->main_pool, 1, sizeof(__Fusionee) );
1183      if (!fusionee)
1184           return D_OOSHM();
1185 
1186      fusionee->id  = fusion_id;
1187      fusionee->pid = direct_gettid();
1188 
1189      ret = fusion_skirmish_prevail( &shared->fusionees_lock );
1190      if (ret) {
1191           SHFREE( shared->main_pool, fusionee );
1192           return ret;
1193      }
1194 
1195      direct_list_append( &shared->fusionees, &fusionee->link );
1196 
1197      fusion_skirmish_dismiss( &shared->fusionees_lock );
1198 
1199      /* Set local pointer. */
1200      world->fusionee = fusionee;
1201 
1202      return DR_OK;
1203 }
1204 
1205 void
_fusion_add_local(FusionWorld * world,FusionRef * ref,int add)1206 _fusion_add_local( FusionWorld *world, FusionRef *ref, int add )
1207 {
1208      FusionWorldShared *shared;
1209      __Fusionee        *fusionee;
1210      __FusioneeRef     *fusionee_ref;
1211 
1212      //D_DEBUG_AT( Fusion_Main, "%s( %p, %p, %d )\n", __FUNCTION__, world, ref, add );
1213 
1214      D_ASSERT( ref != NULL );
1215      D_MAGIC_ASSERT( world, FusionWorld );
1216 
1217      shared = world->shared;
1218      D_MAGIC_ASSERT( shared, FusionWorldShared );
1219 
1220      fusionee = world->fusionee;
1221      D_ASSERT( fusionee != NULL );
1222 
1223      direct_list_foreach (fusionee_ref, fusionee->refs) {
1224           if (fusionee_ref->ref == ref)
1225                break;
1226      }
1227 
1228      if (fusionee_ref) {
1229           fusionee_ref->count += add;
1230 
1231           //D_DEBUG_AT( Fusion_Main, " -> refs = %d\n", fusionee_ref->count );
1232 
1233           if (fusionee_ref->count == 0) {
1234                direct_list_remove( &fusionee->refs, &fusionee_ref->link );
1235 
1236                SHFREE( shared->main_pool, fusionee_ref );
1237           }
1238      }
1239      else {
1240           if (add <= 0) /* called from _fusion_remove_fusionee() */
1241                return;
1242 
1243           //D_DEBUG_AT( Fusion_Main, " -> new ref\n" );
1244 
1245           fusionee_ref = SHCALLOC( shared->main_pool, 1, sizeof(__FusioneeRef) );
1246           if (!fusionee_ref) {
1247                D_OOSHM();
1248                return;
1249           }
1250 
1251           fusionee_ref->ref   = ref;
1252           fusionee_ref->count = add;
1253 
1254           direct_list_prepend( &fusionee->refs, &fusionee_ref->link );
1255      }
1256 }
1257 
1258 void
_fusion_check_locals(FusionWorld * world,FusionRef * ref)1259 _fusion_check_locals( FusionWorld *world, FusionRef *ref )
1260 {
1261      FusionWorldShared *shared;
1262      __Fusionee        *fusionee;
1263      __FusioneeRef     *fusionee_ref, *temp;
1264      DirectLink        *list = NULL;
1265 
1266      D_DEBUG_AT( Fusion_Main, "%s( %p, %p )\n", __FUNCTION__, world, ref );
1267 
1268      D_ASSERT( ref != NULL );
1269 
1270      D_MAGIC_ASSERT( world, FusionWorld );
1271 
1272      shared = world->shared;
1273 
1274      D_MAGIC_ASSERT( shared, FusionWorldShared );
1275 
1276      if (fusion_skirmish_prevail( &shared->fusionees_lock ))
1277           return;
1278 
1279      direct_list_foreach (fusionee, shared->fusionees) {
1280           if (fusionee->id == world->fusion_id)
1281                continue;
1282 
1283           direct_list_foreach (fusionee_ref, fusionee->refs) {
1284                if (fusionee_ref->ref == ref) {
1285                     if (kill( fusionee->pid, 0 ) < 0 && errno == ESRCH) {
1286                          direct_list_remove( &fusionee->refs, &fusionee_ref->link );
1287                          direct_list_append( &list, &fusionee_ref->link );
1288                     }
1289                     break;
1290                }
1291           }
1292      }
1293 
1294      fusion_skirmish_dismiss( &shared->fusionees_lock );
1295 
1296      direct_list_foreach_safe (fusionee_ref, temp, list) {
1297           _fusion_ref_change( ref, -fusionee_ref->count, false );
1298 
1299           SHFREE( shared->main_pool, fusionee_ref );
1300      }
1301 }
1302 
1303 void
_fusion_remove_all_locals(FusionWorld * world,const FusionRef * ref)1304 _fusion_remove_all_locals( FusionWorld *world, const FusionRef *ref )
1305 {
1306      FusionWorldShared *shared;
1307      __Fusionee        *fusionee;
1308      __FusioneeRef     *fusionee_ref, *temp;
1309 
1310      D_DEBUG_AT( Fusion_Main, "%s( %p, %p )\n", __FUNCTION__, world, ref );
1311 
1312      D_ASSERT( ref != NULL );
1313 
1314      D_MAGIC_ASSERT( world, FusionWorld );
1315 
1316      shared = world->shared;
1317 
1318      D_MAGIC_ASSERT( shared, FusionWorldShared );
1319 
1320      if (fusion_skirmish_prevail( &shared->fusionees_lock ))
1321           return;
1322 
1323      direct_list_foreach (fusionee, shared->fusionees) {
1324           direct_list_foreach_safe (fusionee_ref, temp, fusionee->refs) {
1325                if (fusionee_ref->ref == ref) {
1326                     direct_list_remove( &fusionee->refs, &fusionee_ref->link );
1327 
1328                     SHFREE( shared->main_pool, fusionee_ref );
1329                }
1330           }
1331      }
1332 
1333      fusion_skirmish_dismiss( &shared->fusionees_lock );
1334 }
1335 
1336 static void
_fusion_remove_fusionee(FusionWorld * world,FusionID fusion_id)1337 _fusion_remove_fusionee( FusionWorld *world, FusionID fusion_id )
1338 {
1339      FusionWorldShared *shared;
1340      __Fusionee        *fusionee;
1341      __FusioneeRef     *fusionee_ref, *temp;
1342 
1343      D_DEBUG_AT( Fusion_Main, "%s( %p, %lu )\n", __FUNCTION__, world, fusion_id );
1344 
1345      D_MAGIC_ASSERT( world, FusionWorld );
1346 
1347      shared = world->shared;
1348 
1349      D_MAGIC_ASSERT( shared, FusionWorldShared );
1350 
1351      fusion_skirmish_prevail( &shared->fusionees_lock );
1352 
1353      if (fusion_id == world->fusion_id) {
1354           fusionee = world->fusionee;
1355      }
1356      else {
1357           direct_list_foreach (fusionee, shared->fusionees) {
1358                if (fusionee->id == fusion_id)
1359                     break;
1360           }
1361      }
1362 
1363      if (!fusionee) {
1364           D_DEBUG_AT( Fusion_Main, "Fusionee %lu not found!\n", fusion_id );
1365           fusion_skirmish_dismiss( &shared->fusionees_lock );
1366           return;
1367      }
1368 
1369      direct_list_remove( &shared->fusionees, &fusionee->link );
1370 
1371      fusion_skirmish_dismiss( &shared->fusionees_lock );
1372 
1373      direct_list_foreach_safe (fusionee_ref, temp, fusionee->refs) {
1374           direct_list_remove( &fusionee->refs, &fusionee_ref->link );
1375 
1376           _fusion_ref_change( fusionee_ref->ref, -fusionee_ref->count, false );
1377 
1378           SHFREE( shared->main_pool, fusionee_ref );
1379      }
1380 
1381      SHFREE( shared->main_pool, fusionee );
1382 }
1383 
1384 /**********************************************************************************************************************/
1385 
1386 DirectResult
_fusion_send_message(int fd,const void * msg,size_t msg_size,struct sockaddr_un * addr)1387 _fusion_send_message( int                 fd,
1388                       const void         *msg,
1389                       size_t              msg_size,
1390                       struct sockaddr_un *addr )
1391 {
1392      socklen_t len = sizeof(struct sockaddr_un);
1393 
1394      D_ASSERT( msg != NULL );
1395 
1396      if (!addr) {
1397           addr = alloca( sizeof(struct sockaddr_un) );
1398           getsockname( fd, (struct sockaddr*)addr, &len );
1399      }
1400 
1401      while (sendto( fd, msg, msg_size, 0, (struct sockaddr*)addr, len ) < 0) {
1402           switch (errno) {
1403                case EINTR:
1404                     continue;
1405                case ECONNREFUSED:
1406                     return DR_FUSION;
1407                default:
1408                     break;
1409           }
1410 
1411           D_PERROR( "Fusion: sendto()\n" );
1412 
1413           return DR_IO;
1414      }
1415 
1416      return DR_OK;
1417 }
1418 
1419 DirectResult
_fusion_recv_message(int fd,void * msg,size_t msg_size,struct sockaddr_un * addr)1420 _fusion_recv_message( int                 fd,
1421                       void               *msg,
1422                       size_t              msg_size,
1423                       struct sockaddr_un *addr )
1424 {
1425      socklen_t len = addr ? sizeof(struct sockaddr_un) : 0;
1426 
1427      D_ASSERT( msg != NULL );
1428 
1429      while (recvfrom( fd, msg, msg_size, 0, (struct sockaddr*)addr, &len ) < 0) {
1430           switch (errno) {
1431                case EINTR:
1432                     continue;
1433                case ECONNREFUSED:
1434                     return DR_FUSION;
1435                default:
1436                     break;
1437           }
1438 
1439           D_PERROR( "Fusion: recvfrom()\n" );
1440 
1441           return DR_IO;
1442      }
1443 
1444      return DR_OK;
1445 }
1446 
1447 /**********************************************************************************************************************/
1448 
1449 static void
fusion_fork_handler_prepare(void)1450 fusion_fork_handler_prepare( void )
1451 {
1452      int i;
1453 
1454      D_DEBUG_AT( Fusion_Main, "%s()\n", __FUNCTION__ );
1455 
1456      for (i=0; i<FUSION_MAX_WORLDS; i++) {
1457           FusionWorld *world = fusion_worlds[i];
1458 
1459           if (!world)
1460                continue;
1461 
1462           D_MAGIC_ASSERT( world, FusionWorld );
1463 
1464           if (world->fork_callback)
1465                world->fork_callback( world->fork_action, FFS_PREPARE );
1466      }
1467 }
1468 
1469 static void
fusion_fork_handler_parent(void)1470 fusion_fork_handler_parent( void )
1471 {
1472      int i;
1473 
1474      D_DEBUG_AT( Fusion_Main, "%s()\n", __FUNCTION__ );
1475 
1476      for (i=0; i<FUSION_MAX_WORLDS; i++) {
1477           FusionWorld        *world = fusion_worlds[i];
1478            FusionWorldShared *shared;
1479 
1480           if (!world)
1481                continue;
1482 
1483           D_MAGIC_ASSERT( world, FusionWorld );
1484 
1485           shared = world->shared;
1486 
1487           D_MAGIC_ASSERT( shared, FusionWorldShared );
1488 
1489           if (world->fork_callback)
1490                world->fork_callback( world->fork_action, FFS_PARENT );
1491 
1492           if (world->fork_action == FFA_FORK) {
1493                /* Increase the shared reference counter. */
1494                if (fusion_master( world ))
1495                     shared->refs++;
1496 
1497                /* Cancel the dispatcher to prevent conflicts. */
1498                direct_thread_cancel( world->dispatch_loop );
1499           }
1500      }
1501 }
1502 
1503 static void
fusion_fork_handler_child(void)1504 fusion_fork_handler_child( void )
1505 {
1506      int i;
1507 
1508      D_DEBUG_AT( Fusion_Main, "%s()\n", __FUNCTION__ );
1509 
1510      for (i=0; i<FUSION_MAX_WORLDS; i++) {
1511           FusionWorld       *world = fusion_worlds[i];
1512           FusionWorldShared *shared;
1513 
1514           if (!world)
1515                continue;
1516 
1517           D_MAGIC_ASSERT( world, FusionWorld );
1518 
1519           shared = world->shared;
1520 
1521           D_MAGIC_ASSERT( shared, FusionWorldShared );
1522 
1523           if (world->fork_callback)
1524                world->fork_callback( world->fork_action, FFS_CHILD );
1525 
1526           switch (world->fork_action) {
1527                default:
1528                     D_BUG( "unknown fork action %d", world->fork_action );
1529 
1530                case FFA_CLOSE:
1531                     D_DEBUG_AT( Fusion_Main, "  -> closing world %d\n", i );
1532 
1533                     /* Remove world from global list. */
1534                     fusion_worlds[i] = NULL;
1535 
1536                     /* Unmap shared area. */
1537                     munmap( world->shared, sizeof(FusionWorldShared) );
1538 
1539                     /* Close socket. */
1540                     close( world->fusion_fd );
1541 
1542                     /* Free local world data. */
1543                     D_MAGIC_CLEAR( world );
1544                     D_FREE( world );
1545 
1546                     break;
1547 
1548                case FFA_FORK: {
1549                     __Fusionee    *fusionee;
1550                     __FusioneeRef *fusionee_ref;
1551 
1552                     D_DEBUG_AT( Fusion_Main, "  -> forking in world %d\n", i );
1553 
1554                     fusionee = world->fusionee;
1555 
1556                     D_DEBUG_AT( Fusion_Main, "  -> duplicating fusion id %lu\n", world->fusion_id );
1557 
1558                     fusion_skirmish_prevail( &shared->fusionees_lock );
1559 
1560                     if (_fusion_add_fusionee( world, world->fusion_id )) {
1561                          fusion_skirmish_dismiss( &shared->fusionees_lock );
1562                          raise( SIGTRAP );
1563                     }
1564 
1565                     D_DEBUG_AT( Fusion_Main, "  -> duplicating local refs...\n" );
1566 
1567                     direct_list_foreach (fusionee_ref, fusionee->refs) {
1568                          __FusioneeRef *new_ref;
1569 
1570                          new_ref = SHCALLOC( shared->main_pool, 1, sizeof(__FusioneeRef) );
1571                          if (!new_ref) {
1572                               D_OOSHM();
1573                               fusion_skirmish_dismiss( &shared->fusionees_lock );
1574                               raise( SIGTRAP );
1575                          }
1576 
1577                          new_ref->ref   = fusionee_ref->ref;
1578                          new_ref->count = fusionee_ref->count;
1579                          /* Avoid locking. */
1580                          new_ref->ref->multi.builtin.local += new_ref->count;
1581 
1582                          direct_list_append( &((__Fusionee*)world->fusionee)->refs, &new_ref->link );
1583                     }
1584 
1585                     fusion_skirmish_dismiss( &shared->fusionees_lock );
1586 
1587                     D_DEBUG_AT( Fusion_Main, "  -> restarting dispatcher loop...\n" );
1588 
1589                     /* Restart the dispatcher thread. FIXME: free old struct */
1590                     world->dispatch_loop = direct_thread_create( DTT_MESSAGING,
1591                                                                  fusion_dispatch_loop,
1592                                                                  world, "Fusion Dispatch" );
1593                     if (!world->dispatch_loop)
1594                          raise( SIGTRAP );
1595 
1596                }    break;
1597           }
1598      }
1599 }
1600 
1601 /**********************************************************************************************************************/
1602 
1603 /*
1604  * Enters a fusion world by joining or creating it.
1605  *
1606  * If <b>world</b> is negative, the next free index is used to create a new world.
1607  * Otherwise the world with the specified index is joined or created.
1608  */
1609 DirectResult
fusion_enter(int world_index,int abi_version,FusionEnterRole role,FusionWorld ** ret_world)1610 fusion_enter( int               world_index,
1611               int               abi_version,
1612               FusionEnterRole   role,
1613               FusionWorld     **ret_world )
1614 {
1615      DirectResult        ret     = DR_OK;
1616      int                 fd      = -1;
1617      FusionID            id      = -1;
1618      FusionWorld        *world   = NULL;
1619      FusionWorldShared  *shared  = MAP_FAILED;
1620      struct sockaddr_un  addr;
1621      char                buf[128];
1622      int                 len, err;
1623 
1624      D_DEBUG_AT( Fusion_Main, "%s( %d, %d, %p )\n", __FUNCTION__, world_index, abi_version, ret_world );
1625 
1626      D_ASSERT( ret_world != NULL );
1627 
1628      if (world_index >= FUSION_MAX_WORLDS) {
1629           D_ERROR( "Fusion/Init: World index %d exceeds maximum index %d!\n", world_index, FUSION_MAX_WORLDS - 1 );
1630           return DR_INVARG;
1631      }
1632 
1633      if (fusion_config->force_slave)
1634           role = FER_SLAVE;
1635 
1636      pthread_once( &fusion_init_once, init_once );
1637 
1638      direct_initialize();
1639 
1640      fd = socket( PF_LOCAL, SOCK_RAW, 0 );
1641      if (fd < 0) {
1642           D_PERROR( "Fusion/Init: Error creating local socket!\n" );
1643           return DR_IO;
1644      }
1645 
1646      /* Set close-on-exec flag. */
1647      if (fcntl( fd, F_SETFD, FD_CLOEXEC ) < 0)
1648           D_PERROR( "Fusion/Init: Couldn't set close-on-exec flag!\n" );
1649 
1650      pthread_mutex_lock( &fusion_worlds_lock );
1651 
1652      addr.sun_family = AF_UNIX;
1653 
1654      if (world_index < 0) {
1655           if (role == FER_SLAVE) {
1656                D_ERROR( "Fusion/Init: Slave role and a new world (index -1) was requested!\n" );
1657                pthread_mutex_unlock( &fusion_worlds_lock );
1658                close( fd );
1659                return DR_INVARG;
1660           }
1661 
1662           for (world_index=0; world_index<FUSION_MAX_WORLDS; world_index++) {
1663                if (fusion_worlds[world_index])
1664                     continue;
1665 
1666                len = snprintf( addr.sun_path, sizeof(addr.sun_path), "/tmp/.fusion-%d/", world_index );
1667                /* Make socket directory if it doesn't exits. */
1668                if (mkdir( addr.sun_path, 0775 ) == 0) {
1669                     chmod( addr.sun_path, 0775 );
1670                     if (fusion_config->shmfile_gid != (gid_t)-1)
1671                          chown( addr.sun_path, -1, fusion_config->shmfile_gid );
1672                }
1673 
1674                snprintf( addr.sun_path+len, sizeof(addr.sun_path)-len, "%lx", FUSION_ID_MASTER );
1675 
1676                /* Bind to address. */
1677                err = bind( fd, (struct sockaddr*)&addr, sizeof(addr) );
1678                if (err == 0) {
1679                     chmod( addr.sun_path, 0660 );
1680                     /* Change group, if requested. */
1681                     if (fusion_config->shmfile_gid != (gid_t)-1)
1682                          chown( addr.sun_path, -1, fusion_config->shmfile_gid );
1683                     id = FUSION_ID_MASTER;
1684                     break;
1685                }
1686           }
1687      }
1688      else {
1689           world = fusion_worlds[world_index];
1690           if (!world) {
1691                len = snprintf( addr.sun_path, sizeof(addr.sun_path), "/tmp/.fusion-%d/", world_index );
1692                /* Make socket directory if it doesn't exits. */
1693                if (mkdir( addr.sun_path, 0775 ) == 0) {
1694                     chmod( addr.sun_path, 0775 );
1695                     if (fusion_config->shmfile_gid != (gid_t)-1)
1696                          chown( addr.sun_path, -1, fusion_config->shmfile_gid );
1697                }
1698 
1699                /* Check wether we are master. */
1700                snprintf( addr.sun_path+len, sizeof(addr.sun_path)-len, "%lx", FUSION_ID_MASTER );
1701 
1702                err = bind( fd, (struct sockaddr*)&addr, sizeof(addr) );
1703                if (err < 0) {
1704                     if (role == FER_MASTER) {
1705                          D_ERROR( "Fusion/Main: Couldn't start session as master! Remove %s.\n", addr.sun_path );
1706                          ret = DR_INIT;
1707                          goto error;
1708                     }
1709 
1710                     /* Auto generate slave id. */
1711                     for (id = FUSION_ID_MASTER+1; id < (FusionID)-1; id++) {
1712                          snprintf( addr.sun_path+len, sizeof(addr.sun_path)-len, "%lx", id );
1713                          err = bind( fd, (struct sockaddr*)&addr, sizeof(addr) );
1714                          if (err == 0) {
1715                               chmod( addr.sun_path, 0660 );
1716                                /* Change group, if requested. */
1717                               if (fusion_config->shmfile_gid != (gid_t)-1)
1718                                    chown( addr.sun_path, -1, fusion_config->shmfile_gid );
1719                               break;
1720                          }
1721                     }
1722                }
1723                else if (err == 0 && role != FER_SLAVE) {
1724                     chmod( addr.sun_path, 0660 );
1725                     /* Change group, if requested. */
1726                     if (fusion_config->shmfile_gid != (gid_t)-1)
1727                          chown( addr.sun_path, -1, fusion_config->shmfile_gid );
1728                     id = FUSION_ID_MASTER;
1729                }
1730           }
1731      }
1732 
1733      /* Enter a world again? */
1734      if (world) {
1735           D_MAGIC_ASSERT( world, FusionWorld );
1736           D_ASSERT( world->refs > 0 );
1737 
1738           /* Check the role again. */
1739           switch (role) {
1740                case FER_MASTER:
1741                     if (world->fusion_id != FUSION_ID_MASTER) {
1742                          D_ERROR( "Fusion/Init: Master role requested for a world (%d) "
1743                                   "we're already slave in!\n", world_index );
1744                          ret = DR_UNSUPPORTED;
1745                          goto error;
1746                     }
1747                     break;
1748 
1749                case FER_SLAVE:
1750                     if (world->fusion_id == FUSION_ID_MASTER) {
1751                          D_ERROR( "Fusion/Init: Slave role requested for a world (%d) "
1752                                   "we're already master in!\n", world_index );
1753                          ret = DR_UNSUPPORTED;
1754                          goto error;
1755                     }
1756                     break;
1757 
1758                case FER_ANY:
1759                     break;
1760           }
1761 
1762           shared = world->shared;
1763 
1764           D_MAGIC_ASSERT( shared, FusionWorldShared );
1765 
1766           if (shared->world_abi != abi_version) {
1767                D_ERROR( "Fusion/Init: World ABI (%d) of world '%d' doesn't match own (%d)!\n",
1768                         shared->world_abi, world_index, abi_version );
1769                ret = DR_VERSIONMISMATCH;
1770                goto error;
1771           }
1772 
1773           world->refs++;
1774 
1775           pthread_mutex_unlock( &fusion_worlds_lock );
1776 
1777           D_DEBUG_AT( Fusion_Main, "  -> using existing world %p [%d]\n", world, world_index );
1778 
1779           close( fd );
1780 
1781           /* Return the world. */
1782           *ret_world = world;
1783 
1784           return DR_OK;
1785      }
1786 
1787      if (id == (FusionID)-1) {
1788           D_ERROR( "Fusion/Init: Opening fusion socket (world %d) as '%s' failed!\n", world_index,
1789                     role == FER_ANY ? "any" : (role == FER_MASTER ? "master" : "slave")  );
1790           ret = DR_INIT;
1791           goto error;
1792      }
1793 
1794      D_DEBUG_AT( Fusion_Main, "  -> Fusion ID 0x%08lx\n", id );
1795 
1796      if (id == FUSION_ID_MASTER) {
1797           int shared_fd;
1798 
1799           snprintf( buf, sizeof(buf), "%s/fusion.%d.core",
1800                     fusion_config->tmpfs ? : "/dev/shm", world_index );
1801 
1802           /* Open shared memory file. */
1803           shared_fd = open( buf, O_RDWR | O_CREAT | O_TRUNC, 0660 );
1804           if (shared_fd < 0) {
1805                D_PERROR( "Fusion/Init: Couldn't open shared memory file!\n" );
1806                ret = DR_INIT;
1807                goto error;
1808           }
1809 
1810           if (fusion_config->shmfile_gid != (gid_t)-1) {
1811                if (fchown( shared_fd, -1, fusion_config->shmfile_gid ) != 0)
1812                     D_INFO( "Fusion/Init: Changing owner on %s failed... continuing on.\n", buf );
1813           }
1814 
1815           fchmod( shared_fd, 0660 );
1816           ftruncate( shared_fd, sizeof(FusionWorldShared) );
1817 
1818           /* Map shared area. */
1819           shared = mmap( (void*) 0x20000000 + 0x2000 * world_index, sizeof(FusionWorldShared),
1820                          PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, shared_fd, 0 );
1821           if (shared == MAP_FAILED) {
1822                D_PERROR( "Fusion/Init: Mapping shared area failed!\n" );
1823                close( shared_fd );
1824                ret = DR_INIT;
1825                goto error;
1826           }
1827 
1828           close( shared_fd );
1829 
1830           D_DEBUG_AT( Fusion_Main, "  -> shared area at %p, size %zu\n", shared, sizeof(FusionWorldShared) );
1831 
1832           /* Initialize reference counter. */
1833           shared->refs = 1;
1834 
1835           /* Set ABI version. */
1836           shared->world_abi = abi_version;
1837 
1838           /* Set the world index. */
1839           shared->world_index = world_index;
1840 
1841           /* Set pool allocation base/max. */
1842           shared->pool_base = (void*)0x20000000 + 0x2000 * FUSION_MAX_WORLDS + 0x8000000 * world_index;
1843           shared->pool_max  = shared->pool_base + 0x8000000 - 1;
1844 
1845           /* Set start time of world clock. */
1846           gettimeofday( &shared->start_time, NULL );
1847 
1848           D_MAGIC_SET( shared, FusionWorldShared );
1849      }
1850      else {
1851           FusionEnter enter;
1852           int         shared_fd;
1853 
1854           /* Fill enter information. */
1855           enter.type      = FMT_ENTER;
1856           enter.fusion_id = id;
1857 
1858           snprintf( addr.sun_path, sizeof(addr.sun_path),
1859                     "/tmp/.fusion-%d/%lx", world_index, FUSION_ID_MASTER );
1860 
1861           /* Send enter message (used to sync with the master) */
1862           ret = _fusion_send_message( fd, &enter, sizeof(FusionEnter), &addr );
1863           if (ret == DR_OK) {
1864                ret = _fusion_recv_message( fd, &enter, sizeof(FusionEnter), NULL );
1865                if (ret == DR_OK && enter.type != FMT_ENTER) {
1866                     D_ERROR( "Fusion/Init: Expected message ENTER, got '%d'!\n", enter.type );
1867                     ret = DR_FUSION;
1868                }
1869           }
1870 
1871           if (ret) {
1872                D_ERROR( "Fusion/Init: Could not enter world '%d'!\n", world_index );
1873                goto error;
1874           }
1875 
1876           snprintf( buf, sizeof(buf), "%s/fusion.%d.core",
1877                     fusion_config->tmpfs ? : "/dev/shm", world_index );
1878 
1879           /* Open shared memory file. */
1880           shared_fd = open( buf, O_RDWR );
1881           if (shared_fd < 0) {
1882                D_PERROR( "Fusion/Init: Couldn't open shared memory file!\n" );
1883                ret = DR_INIT;
1884                goto error;
1885           }
1886 
1887           /* Map shared area. */
1888           shared = mmap( (void*) 0x20000000 + 0x2000 * world_index, sizeof(FusionWorldShared),
1889                          PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, shared_fd, 0 );
1890           if (shared == MAP_FAILED) {
1891                D_PERROR( "Fusion/Init: Mapping shared area failed!\n" );
1892                close( shared_fd );
1893                ret = DR_INIT;
1894                goto error;
1895           }
1896 
1897           close( shared_fd );
1898 
1899           D_DEBUG_AT( Fusion_Main, "  -> shared area at %p, size %zu\n", shared, sizeof(FusionWorldShared) );
1900 
1901           D_MAGIC_ASSERT( shared, FusionWorldShared );
1902 
1903           /* Check ABI version. */
1904           if (shared->world_abi != abi_version) {
1905                D_ERROR( "Fusion/Init: World ABI (%d) doesn't match own (%d)!\n",
1906                         shared->world_abi, abi_version );
1907                ret = DR_VERSIONMISMATCH;
1908                goto error;
1909           }
1910      }
1911 
1912      /* Synchronize to world clock. */
1913      direct_clock_set_start( &shared->start_time );
1914 
1915      /* Allocate local data. */
1916      world = D_CALLOC( 1, sizeof(FusionWorld) );
1917      if (!world) {
1918           ret = D_OOM();
1919           goto error;
1920      }
1921 
1922      /* Initialize local data. */
1923      world->refs      = 1;
1924      world->shared    = shared;
1925      world->fusion_fd = fd;
1926      world->fusion_id = id;
1927 
1928      D_MAGIC_SET( world, FusionWorld );
1929 
1930      fusion_worlds[world_index] = world;
1931 
1932      /* Initialize shared memory part. */
1933      ret = fusion_shm_init( world );
1934      if (ret)
1935           goto error2;
1936 
1937      D_DEBUG_AT( Fusion_Main, "  -> initializing other parts...\n" );
1938 
1939      /* Initialize other parts. */
1940      if (world->fusion_id == FUSION_ID_MASTER) {
1941           fusion_skirmish_init( &shared->arenas_lock, "Fusion Arenas", world );
1942           fusion_skirmish_init( &shared->reactor_globals, "Fusion Reactor Globals", world );
1943           fusion_skirmish_init( &shared->fusionees_lock, "Fusionees", world );
1944 
1945           /* Create the main pool. */
1946           ret = fusion_shm_pool_create( world, "Fusion Main Pool", 0x100000,
1947                                         fusion_config->debugshm, &shared->main_pool );
1948           if (ret)
1949                goto error3;
1950      }
1951 
1952      /* Add ourselves to the list of fusionees. */
1953      ret = _fusion_add_fusionee( world, id );
1954      if (ret)
1955           goto error4;
1956 
1957      D_DEBUG_AT( Fusion_Main, "  -> starting dispatcher loop...\n" );
1958 
1959      /* Start the dispatcher thread. */
1960      world->dispatch_loop = direct_thread_create( DTT_MESSAGING,
1961                                                   fusion_dispatch_loop,
1962                                                   world, "Fusion Dispatch" );
1963      if (!world->dispatch_loop) {
1964           ret = DR_FAILURE;
1965           goto error5;
1966      }
1967 
1968      D_DEBUG_AT( Fusion_Main, "  -> done. (%p)\n", world );
1969 
1970      pthread_mutex_unlock( &fusion_worlds_lock );
1971 
1972      /* Return the fusion world. */
1973      *ret_world = world;
1974 
1975      return DR_OK;
1976 
1977 
1978 error5:
1979      if (world->dispatch_loop)
1980           direct_thread_destroy( world->dispatch_loop );
1981 
1982      _fusion_remove_fusionee( world, id );
1983 
1984 error4:
1985      if (world->fusion_id == FUSION_ID_MASTER)
1986           fusion_shm_pool_destroy( world, shared->main_pool );
1987 
1988 error3:
1989      if (world->fusion_id == FUSION_ID_MASTER) {
1990           fusion_skirmish_destroy( &shared->arenas_lock );
1991           fusion_skirmish_destroy( &shared->reactor_globals );
1992           fusion_skirmish_destroy( &shared->fusionees_lock );
1993      }
1994 
1995      fusion_shm_deinit( world );
1996 
1997 
1998 error2:
1999      fusion_worlds[world_index] = world;
2000 
2001      D_MAGIC_CLEAR( world );
2002 
2003      D_FREE( world );
2004 
2005 error:
2006      if (shared != MAP_FAILED) {
2007           if (id == FUSION_ID_MASTER)
2008                D_MAGIC_CLEAR( shared );
2009 
2010           munmap( shared, sizeof(FusionWorldShared) );
2011      }
2012 
2013      if (fd != -1) {
2014           /* Unbind. */
2015           socklen_t len = sizeof(addr);
2016           if (getsockname( fd, (struct sockaddr*)&addr, &len ) == 0)
2017                unlink( addr.sun_path );
2018 
2019           close( fd );
2020      }
2021 
2022      pthread_mutex_unlock( &fusion_worlds_lock );
2023 
2024      direct_shutdown();
2025 
2026      return ret;
2027 }
2028 
2029 DirectResult
fusion_stop_dispatcher(FusionWorld * world,bool emergency)2030 fusion_stop_dispatcher( FusionWorld *world,
2031                         bool         emergency )
2032 {
2033      if (!emergency) {
2034           fusion_sync( world );
2035 
2036           direct_thread_lock( world->dispatch_loop );
2037      }
2038 
2039      world->dispatch_stop = true;
2040 
2041      if (!emergency) {
2042           direct_thread_unlock( world->dispatch_loop );
2043 
2044           fusion_sync( world );
2045      }
2046 
2047      return DR_OK;
2048 }
2049 
2050 /*
2051  * Exits the fusion world.
2052  *
2053  * If 'emergency' is true the function won't join but kill the dispatcher thread.
2054  */
2055 DirectResult
fusion_exit(FusionWorld * world,bool emergency)2056 fusion_exit( FusionWorld *world,
2057              bool         emergency )
2058 {
2059      FusionWorldShared *shared;
2060      int                world_index;
2061      bool               clear = false;
2062 
2063      D_DEBUG_AT( Fusion_Main, "%s( %p, %semergency )\n", __FUNCTION__, world, emergency ? "" : "no " );
2064 
2065      D_MAGIC_ASSERT( world, FusionWorld );
2066 
2067      shared = world->shared;
2068 
2069      D_MAGIC_ASSERT( shared, FusionWorldShared );
2070 
2071      world_index = shared->world_index;
2072 
2073      pthread_mutex_lock( &fusion_worlds_lock );
2074 
2075      D_ASSERT( world->refs > 0 );
2076 
2077      if (--world->refs) {
2078           pthread_mutex_unlock( &fusion_worlds_lock );
2079           return DR_OK;
2080      }
2081 
2082      if (!emergency) {
2083           FusionMessageType msg = FMT_SEND;
2084 
2085           /* Wakeup dispatcher. */
2086           if (_fusion_send_message( world->fusion_fd, &msg, sizeof(msg), NULL ))
2087                direct_thread_cancel( world->dispatch_loop );
2088 
2089           /* Wait for its termination. */
2090           direct_thread_join( world->dispatch_loop );
2091      }
2092 
2093      direct_thread_destroy( world->dispatch_loop );
2094 
2095      /* Remove ourselves from list. */
2096      if (!emergency || fusion_master( world )) {
2097           _fusion_remove_fusionee( world, world->fusion_id );
2098      }
2099      else {
2100           struct sockaddr_un addr;
2101           FusionLeave        leave;
2102 
2103           addr.sun_family = AF_UNIX;
2104           snprintf( addr.sun_path, sizeof(addr.sun_path),
2105                     "/tmp/.fusion-%d/%lx", world_index, FUSION_ID_MASTER );
2106 
2107           leave.type      = FMT_LEAVE;
2108           leave.fusion_id = world->fusion_id;
2109 
2110           _fusion_send_message( world->fusion_fd, &leave, sizeof(FusionLeave), &addr );
2111      }
2112 
2113      /* Master has to deinitialize shared data. */
2114      if (fusion_master( world )) {
2115           shared->refs--;
2116           if (shared->refs == 0) {
2117                fusion_skirmish_destroy( &shared->reactor_globals );
2118                fusion_skirmish_destroy( &shared->arenas_lock );
2119                fusion_skirmish_destroy( &shared->fusionees_lock );
2120 
2121                fusion_shm_pool_destroy( world, shared->main_pool );
2122 
2123                /* Deinitialize shared memory. */
2124                fusion_shm_deinit( world );
2125 
2126                clear = true;
2127           }
2128      }
2129      else {
2130           /* Leave shared memory. */
2131           fusion_shm_deinit( world );
2132      }
2133 
2134      /* Reset local dispatch nodes. */
2135      _fusion_reactor_free_all( world );
2136 
2137      /* Remove world from global list. */
2138      fusion_worlds[shared->world_index] = NULL;
2139 
2140      /* Unmap shared area. */
2141      if (clear)
2142           D_MAGIC_CLEAR( shared );
2143 
2144      munmap( shared, sizeof(FusionWorldShared) );
2145 
2146      /* Close socket. */
2147      close( world->fusion_fd );
2148 
2149      if (clear) {
2150           DIR  *dir;
2151           char  buf[128];
2152           int   len;
2153 
2154           /* Remove core shmfile. */
2155           snprintf( buf, sizeof(buf), "%s/fusion.%d.core",
2156                     fusion_config->tmpfs ? : "/dev/shm", world_index );
2157           D_DEBUG_AT( Fusion_Main, "Removing shmfile %s.\n", buf );
2158           unlink( buf );
2159 
2160           /* Cleanup socket directory. */
2161           len = snprintf( buf, sizeof(buf), "/tmp/.fusion-%d/", world_index );
2162           dir = opendir( buf );
2163           if (dir) {
2164                struct dirent *entry = NULL;
2165                struct dirent  tmp;
2166 
2167                while (readdir_r( dir, &tmp, &entry ) == 0 && entry) {
2168                     if (entry->d_name[0] != '.') {
2169                          struct stat st;
2170 
2171                          direct_snputs( buf+len, entry->d_name, sizeof(buf)-len );
2172                          if (stat( buf, &st ) == 0 && S_ISSOCK(st.st_mode)) {
2173                               D_DEBUG_AT( Fusion_Main, "Removing socket %s.\n", buf );
2174                               unlink( buf );
2175                          }
2176                     }
2177                }
2178 
2179                closedir( dir );
2180           }
2181           else {
2182                D_PERROR( "Fusion/Main: Couldn't open socket directory %s", buf );
2183           }
2184      }
2185 
2186      /* Free local world data. */
2187      D_MAGIC_CLEAR( world );
2188      D_FREE( world );
2189 
2190      D_DEBUG_AT( Fusion_Main, "%s( %p ) done.\n", __FUNCTION__, world );
2191 
2192      pthread_mutex_unlock( &fusion_worlds_lock );
2193 
2194      direct_shutdown();
2195 
2196      return DR_OK;
2197 }
2198 
2199 /*
2200  * Sends a signal to one or more fusionees and optionally waits
2201  * for their processes to terminate.
2202  *
2203  * A fusion_id of zero means all fusionees but the calling one.
2204  * A timeout of zero means infinite waiting while a negative value
2205  * means no waiting at all.
2206  */
2207 DirectResult
fusion_kill(FusionWorld * world,FusionID fusion_id,int signal,int timeout_ms)2208 fusion_kill( FusionWorld *world,
2209              FusionID     fusion_id,
2210              int          signal,
2211              int          timeout_ms )
2212 {
2213      FusionWorldShared *shared;
2214      __Fusionee        *fusionee, *temp;
2215      int                result;
2216 
2217      D_DEBUG_AT( Fusion_Main, "%s( %p, %lu, %d, %d )\n",
2218                  __FUNCTION__, world, fusion_id, signal, timeout_ms );
2219 
2220      D_MAGIC_ASSERT( world, FusionWorld );
2221 
2222      shared = world->shared;
2223 
2224      D_MAGIC_ASSERT( shared, FusionWorldShared );
2225 
2226      fusion_skirmish_prevail( &shared->fusionees_lock );
2227 
2228      direct_list_foreach_safe (fusionee, temp, shared->fusionees) {
2229           if (fusion_id == 0 && fusionee->id == world->fusion_id)
2230                continue;
2231 
2232           if (fusion_id != 0 && fusionee->id != fusion_id)
2233                continue;
2234 
2235           D_DEBUG_AT( Fusion_Main, " -> killing fusionee %lu (%d)...\n", fusionee->id, fusionee->pid );
2236 
2237           result = kill( fusionee->pid, signal );
2238           if (result == 0 && timeout_ms >= 0) {
2239                pid_t     pid  = fusionee->pid;
2240                long long stop = timeout_ms ? (direct_clock_get_micros() + timeout_ms*1000) : 0;
2241 
2242                fusion_skirmish_dismiss( &shared->fusionees_lock );
2243 
2244                while (kill( pid, 0 ) == 0) {
2245                     usleep( 1000 );
2246 
2247                     if (timeout_ms && direct_clock_get_micros() >= stop)
2248                          break;
2249                };
2250 
2251                fusion_skirmish_prevail( &shared->fusionees_lock );
2252           }
2253           else if (result < 0) {
2254                if (errno == ESRCH) {
2255                     D_DEBUG_AT( Fusion_Main, " ... fusionee %lu exited without removing itself!\n", fusionee->id );
2256 
2257                     _fusion_remove_fusionee( world, fusionee->id );
2258                }
2259                else {
2260                     D_PERROR( "Fusion/Main: kill(%d, %d)\n", fusionee->pid, signal );
2261                }
2262           }
2263      }
2264 
2265      fusion_skirmish_dismiss( &shared->fusionees_lock );
2266 
2267      return DR_OK;
2268 }
2269 
2270 /**********************************************************************************************************************/
2271 
2272 static void *
fusion_dispatch_loop(DirectThread * self,void * arg)2273 fusion_dispatch_loop( DirectThread *self, void *arg )
2274 {
2275      FusionWorld        *world = arg;
2276      struct sockaddr_un  addr;
2277      socklen_t           addr_len = sizeof(addr);
2278      fd_set              set;
2279      char                buf[FUSION_MESSAGE_SIZE];
2280 
2281      D_DEBUG_AT( Fusion_Main_Dispatch, "%s() running...\n", __FUNCTION__ );
2282 
2283      while (true) {
2284           int result;
2285 
2286           D_MAGIC_ASSERT( world, FusionWorld );
2287 
2288           FD_ZERO( &set );
2289           FD_SET( world->fusion_fd, &set );
2290 
2291           result = select( world->fusion_fd + 1, &set, NULL, NULL, NULL );
2292           if (result < 0) {
2293                switch (errno) {
2294                     case EINTR:
2295                          continue;
2296 
2297                     default:
2298                          D_PERROR( "Fusion/Dispatcher: select() failed!\n" );
2299                          return NULL;
2300                }
2301           }
2302 
2303           D_MAGIC_ASSERT( world, FusionWorld );
2304 
2305           if (FD_ISSET( world->fusion_fd, &set ) &&
2306               recvfrom( world->fusion_fd, buf, sizeof(buf), 0, (struct sockaddr*)&addr, &addr_len ) > 0) {
2307                FusionMessage *msg = (FusionMessage*)buf;
2308 
2309                pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, NULL );
2310 
2311                D_DEBUG_AT( Fusion_Main_Dispatch, " -> message from '%s'...\n", addr.sun_path );
2312 
2313                direct_thread_lock( world->dispatch_loop );
2314 
2315                if (world->dispatch_stop) {
2316                     D_DEBUG_AT( Fusion_Main_Dispatch, "  -> IGNORING (dispatch_stop!)\n" );
2317                }
2318                else {
2319                     switch (msg->type) {
2320                          case FMT_SEND:
2321                               D_DEBUG_AT( Fusion_Main_Dispatch, "  -> FMT_SEND...\n" );
2322                               break;
2323 
2324                          case FMT_ENTER:
2325                               D_DEBUG_AT( Fusion_Main_Dispatch, "  -> FMT_ENTER...\n" );
2326                               if (!fusion_master( world )) {
2327                                    D_ERROR( "Fusion/Dispatch: Got ENTER request, but I'm not master!\n" );
2328                                    break;
2329                               }
2330                               if (msg->enter.fusion_id == world->fusion_id) {
2331                                    D_ERROR( "Fusion/Dispatch: Received ENTER request from myself!\n" );
2332                                    break;
2333                               }
2334                               /* Nothing to do here. Send back message. */
2335                               _fusion_send_message( world->fusion_fd, msg, sizeof(FusionEnter), &addr );
2336                               break;
2337 
2338                          case FMT_LEAVE:
2339                               D_DEBUG_AT( Fusion_Main_Dispatch, "  -> FMT_LEAVE...\n" );
2340                               if (!fusion_master( world )) {
2341                                    D_ERROR( "Fusion/Dispatch: Got LEAVE request, but I'm not master!\n" );
2342                                    break;
2343                               }
2344                               if (msg->leave.fusion_id == world->fusion_id) {
2345                                    D_ERROR( "Fusion/Dispatch: Received LEAVE request from myself!\n" );
2346                                    break;
2347                               }
2348                               _fusion_remove_fusionee( world, msg->leave.fusion_id );
2349                               break;
2350 
2351                          case FMT_CALL:
2352                               D_DEBUG_AT( Fusion_Main_Dispatch, "  -> FMT_CALL...\n" );
2353                               _fusion_call_process( world, msg->call.call_id, &msg->call );
2354                               break;
2355 
2356                          case FMT_REACTOR:
2357                               D_DEBUG_AT( Fusion_Main_Dispatch, "  -> FMT_REACTOR...\n" );
2358                               _fusion_reactor_process_message( world, msg->reactor.id, msg->reactor.channel,
2359                                                                &buf[sizeof(FusionReactorMessage)] );
2360                               if (msg->reactor.ref) {
2361                                    fusion_ref_down( msg->reactor.ref, true );
2362                                    if (fusion_ref_zero_trylock( msg->reactor.ref ) == DR_OK) {
2363                                         fusion_ref_destroy( msg->reactor.ref );
2364                                         SHFREE( world->shared->main_pool, msg->reactor.ref );
2365                                    }
2366                               }
2367                               break;
2368 
2369                          default:
2370                               D_BUG( "unexpected message type (%d)", msg->type );
2371                               break;
2372                     }
2373                }
2374 
2375                direct_thread_unlock( world->dispatch_loop );
2376 
2377                if (!world->refs) {
2378                     D_DEBUG_AT( Fusion_Main_Dispatch, "  -> good bye!\n" );
2379                     return NULL;
2380                }
2381 
2382                D_DEBUG_AT( Fusion_Main_Dispatch, " ...done\n" );
2383 
2384                pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL );
2385           }
2386      }
2387 
2388      return NULL;
2389 }
2390 
2391 /**********************************************************************************************************************/
2392 
2393 #endif /* FUSION_BUILD_KERNEL */
2394 
2395 /*
2396  * Wait until all pending messages are processed.
2397  */
2398 DirectResult
fusion_sync(const FusionWorld * world)2399 fusion_sync( const FusionWorld *world )
2400 {
2401      D_MAGIC_ASSERT( world, FusionWorld );
2402 
2403      D_DEBUG_AT( Fusion_Main, "%s( %p )\n", __FUNCTION__, world );
2404 
2405      D_DEBUG_AT( Fusion_Main, "  -> syncing with fusion device...\n" );
2406 
2407      while (ioctl( world->fusion_fd, FUSION_SYNC )) {
2408           switch (errno) {
2409                case EINTR:
2410                     continue;
2411                default:
2412                     break;
2413           }
2414 
2415           D_PERROR( "Fusion/Main: FUSION_SYNC failed!\n" );
2416 
2417           return DR_FAILURE;
2418      }
2419 
2420      D_DEBUG_AT( Fusion_Main, "  -> synced\n");
2421 
2422      return DR_OK;
2423 }
2424 
2425 /*
2426  * Sets the fork() action of the calling Fusionee within the world.
2427  */
2428 void
fusion_world_set_fork_action(FusionWorld * world,FusionForkAction action)2429 fusion_world_set_fork_action( FusionWorld      *world,
2430                               FusionForkAction  action )
2431 {
2432      D_MAGIC_ASSERT( world, FusionWorld );
2433 
2434      world->fork_action = action;
2435 }
2436 
2437 /*
2438  * Gets the current fork() action.
2439  */
2440 FusionForkAction
fusion_world_get_fork_action(FusionWorld * world)2441 fusion_world_get_fork_action( FusionWorld *world )
2442 {
2443      D_MAGIC_ASSERT( world, FusionWorld );
2444 
2445      return world->fork_action;
2446 }
2447 
2448 /*
2449  * Registers a callback called upon fork().
2450  */
2451 void
fusion_world_set_fork_callback(FusionWorld * world,FusionForkCallback callback)2452 fusion_world_set_fork_callback( FusionWorld        *world,
2453                                 FusionForkCallback  callback )
2454 {
2455      D_MAGIC_ASSERT( world, FusionWorld );
2456 
2457      world->fork_callback = callback;
2458 }
2459 
2460 /*
2461  * Registers a callback called when a slave exits.
2462  */
2463 void
fusion_world_set_leave_callback(FusionWorld * world,FusionLeaveCallback callback,void * ctx)2464 fusion_world_set_leave_callback( FusionWorld         *world,
2465                                  FusionLeaveCallback  callback,
2466                                  void                *ctx )
2467 {
2468      D_MAGIC_ASSERT( world, FusionWorld );
2469 
2470      world->leave_callback = callback;
2471      world->leave_ctx      = ctx;
2472 }
2473 
2474 /*
2475  * Return the index of the specified world.
2476  */
2477 int
fusion_world_index(const FusionWorld * world)2478 fusion_world_index( const FusionWorld *world )
2479 {
2480      FusionWorldShared *shared;
2481 
2482      D_MAGIC_ASSERT( world, FusionWorld );
2483 
2484      shared = world->shared;
2485 
2486      D_MAGIC_ASSERT( shared, FusionWorldShared );
2487 
2488      return shared->world_index;
2489 }
2490 
2491 /*
2492  * Return the own Fusion ID within the specified world.
2493  */
2494 FusionID
fusion_id(const FusionWorld * world)2495 fusion_id( const FusionWorld *world )
2496 {
2497      D_MAGIC_ASSERT( world, FusionWorld );
2498 
2499      return world->fusion_id;
2500 }
2501 
2502 /*
2503  * Return if the world is a multi application world.
2504  */
2505 bool
fusion_is_multi(const FusionWorld * world)2506 fusion_is_multi( const FusionWorld *world )
2507 {
2508      D_MAGIC_ASSERT( world, FusionWorld );
2509 
2510      return true;
2511 }
2512 
2513 /*
2514  * Return the thread ID of the Fusion Dispatcher within the specified world.
2515  */
2516 pid_t
fusion_dispatcher_tid(const FusionWorld * world)2517 fusion_dispatcher_tid( const FusionWorld *world )
2518 {
2519      D_MAGIC_ASSERT( world, FusionWorld );
2520 
2521      return direct_thread_get_tid( world->dispatch_loop );
2522 }
2523 
2524 /*
2525  * Return true if this process is the master.
2526  */
2527 bool
fusion_master(const FusionWorld * world)2528 fusion_master( const FusionWorld *world )
2529 {
2530      D_MAGIC_ASSERT( world, FusionWorld );
2531 
2532      return world->fusion_id == FUSION_ID_MASTER;
2533 }
2534 
2535 /*
2536  * Check if a pointer points to the shared memory.
2537  */
2538 bool
fusion_is_shared(FusionWorld * world,const void * ptr)2539 fusion_is_shared( FusionWorld *world,
2540                   const void  *ptr )
2541 {
2542      int              i;
2543      DirectResult     ret;
2544      FusionSHM       *shm;
2545      FusionSHMShared *shared;
2546 
2547      D_MAGIC_ASSERT( world, FusionWorld );
2548 
2549      shm = &world->shm;
2550 
2551      D_MAGIC_ASSERT( shm, FusionSHM );
2552 
2553      shared = shm->shared;
2554 
2555      D_MAGIC_ASSERT( shared, FusionSHMShared );
2556 
2557      if (ptr >= (void*) world->shared && ptr < (void*) world->shared + sizeof(FusionWorldShared))
2558           return true;
2559 
2560      ret = fusion_skirmish_prevail( &shared->lock );
2561      if (ret)
2562           return false;
2563 
2564      for (i=0; i<FUSION_SHM_MAX_POOLS; i++) {
2565           if (shared->pools[i].active) {
2566                shmalloc_heap       *heap;
2567                FusionSHMPoolShared *pool = &shared->pools[i];
2568 
2569                D_MAGIC_ASSERT( pool, FusionSHMPoolShared );
2570 
2571                heap = pool->heap;
2572 
2573                D_MAGIC_ASSERT( heap, shmalloc_heap );
2574 
2575                if (ptr >= pool->addr_base && ptr < pool->addr_base + heap->size) {
2576                     fusion_skirmish_dismiss( &shared->lock );
2577                     return true;
2578                }
2579           }
2580      }
2581 
2582      fusion_skirmish_dismiss( &shared->lock );
2583 
2584      return false;
2585 }
2586 
2587 #else /* FUSION_BUILD_MULTI */
2588 
2589 /*
2590  * Enters a fusion world by joining or creating it.
2591  *
2592  * If <b>world_index</b> is negative, the next free index is used to create a new world.
2593  * Otherwise the world with the specified index is joined or created.
2594  */
fusion_enter(int world_index,int abi_version,FusionEnterRole role,FusionWorld ** ret_world)2595 DirectResult fusion_enter( int               world_index,
2596                            int               abi_version,
2597                            FusionEnterRole   role,
2598                            FusionWorld     **ret_world )
2599 {
2600      DirectResult  ret;
2601      FusionWorld  *world = NULL;
2602 
2603      D_ASSERT( ret_world != NULL );
2604 
2605      ret = direct_initialize();
2606      if (ret)
2607           return ret;
2608 
2609      world = D_CALLOC( 1, sizeof(FusionWorld) );
2610      if (!world) {
2611           ret = D_OOM();
2612           goto error;
2613      }
2614 
2615      world->shared = D_CALLOC( 1, sizeof(FusionWorldShared) );
2616      if (!world->shared) {
2617           ret = D_OOM();
2618           goto error;
2619      }
2620 
2621      /* Create the main pool. */
2622      ret = fusion_shm_pool_create( world, "Fusion Main Pool", 0x100000,
2623                                    fusion_config->debugshm, &world->shared->main_pool );
2624      if (ret)
2625           goto error;
2626 
2627      D_MAGIC_SET( world, FusionWorld );
2628      D_MAGIC_SET( world->shared, FusionWorldShared );
2629 
2630      *ret_world = world;
2631 
2632      return DR_OK;
2633 
2634 
2635 error:
2636      if (world) {
2637           if (world->shared)
2638                D_FREE( world->shared );
2639 
2640           D_FREE( world );
2641      }
2642 
2643      direct_shutdown();
2644 
2645      return ret;
2646 }
2647 
2648 DirectResult
fusion_stop_dispatcher(FusionWorld * world,bool emergency)2649 fusion_stop_dispatcher( FusionWorld *world,
2650                         bool         emergency )
2651 {
2652      return DR_OK;
2653 }
2654 
2655 /*
2656  * Exits the fusion world.
2657  *
2658  * If 'emergency' is true the function won't join but kill the dispatcher thread.
2659  */
2660 DirectResult
fusion_exit(FusionWorld * world,bool emergency)2661 fusion_exit( FusionWorld *world,
2662              bool         emergency )
2663 {
2664      D_MAGIC_ASSERT( world, FusionWorld );
2665      D_MAGIC_ASSERT( world->shared, FusionWorldShared );
2666 
2667      fusion_shm_pool_destroy( world, world->shared->main_pool );
2668 
2669      D_MAGIC_CLEAR( world->shared );
2670 
2671      D_FREE( world->shared );
2672 
2673      D_MAGIC_CLEAR( world );
2674 
2675      D_FREE( world );
2676 
2677      direct_shutdown();
2678 
2679      return DR_OK;
2680 }
2681 
2682 /*
2683  * Sets the fork() action of the calling Fusionee within the world.
2684  */
2685 void
fusion_world_set_fork_action(FusionWorld * world,FusionForkAction action)2686 fusion_world_set_fork_action( FusionWorld      *world,
2687                               FusionForkAction  action )
2688 {
2689      D_MAGIC_ASSERT( world, FusionWorld );
2690 }
2691 
2692 /*
2693  * Gets the current fork() action.
2694  */
2695 FusionForkAction
fusion_world_get_fork_action(FusionWorld * world)2696 fusion_world_get_fork_action( FusionWorld *world )
2697 {
2698      D_MAGIC_ASSERT( world, FusionWorld );
2699 
2700      return world->fork_action;
2701 }
2702 
2703 /*
2704  * Registers a callback called upon fork().
2705  */
2706 void
fusion_world_set_fork_callback(FusionWorld * world,FusionForkCallback callback)2707 fusion_world_set_fork_callback( FusionWorld        *world,
2708                                 FusionForkCallback  callback )
2709 {
2710      D_MAGIC_ASSERT( world, FusionWorld );
2711 }
2712 
2713 /*
2714  * Registers a callback called when a slave exits.
2715  */
2716 void
fusion_world_set_leave_callback(FusionWorld * world,FusionLeaveCallback callback,void * ctx)2717 fusion_world_set_leave_callback( FusionWorld         *world,
2718                                  FusionLeaveCallback  callback,
2719                                  void                *ctx )
2720 {
2721      D_MAGIC_ASSERT( world, FusionWorld );
2722 }
2723 
2724 /*
2725  * Return the index of the specified world.
2726  */
2727 int
fusion_world_index(const FusionWorld * world)2728 fusion_world_index( const FusionWorld *world )
2729 {
2730      D_MAGIC_ASSERT( world, FusionWorld );
2731 
2732      return 0;
2733 }
2734 
2735 
2736 /*
2737  * Return true if this process is the master.
2738  */
2739 bool
fusion_master(const FusionWorld * world)2740 fusion_master( const FusionWorld *world )
2741 {
2742      D_MAGIC_ASSERT( world, FusionWorld );
2743 
2744      return true;
2745 }
2746 
2747 /*
2748  * Sends a signal to one or more fusionees and optionally waits
2749  * for their processes to terminate.
2750  *
2751  * A fusion_id of zero means all fusionees but the calling one.
2752  * A timeout of zero means infinite waiting while a negative value
2753  * means no waiting at all.
2754  */
2755 DirectResult
fusion_kill(FusionWorld * world,FusionID fusion_id,int signal,int timeout_ms)2756 fusion_kill( FusionWorld *world,
2757              FusionID     fusion_id,
2758              int          signal,
2759              int          timeout_ms )
2760 {
2761      D_MAGIC_ASSERT( world, FusionWorld );
2762 
2763      return DR_OK;
2764 }
2765 
2766 /*
2767  * Return the own Fusion ID within the specified world.
2768  */
2769 FusionID
fusion_id(const FusionWorld * world)2770 fusion_id( const FusionWorld *world )
2771 {
2772      D_MAGIC_ASSERT( world, FusionWorld );
2773 
2774      return 1;
2775 }
2776 
2777 /*
2778  * Return if the world is a multi application world.
2779  */
2780 bool
fusion_is_multi(const FusionWorld * world)2781 fusion_is_multi( const FusionWorld *world )
2782 {
2783      D_MAGIC_ASSERT( world, FusionWorld );
2784 
2785      return false;
2786 }
2787 
2788 /*
2789  * Return the thread ID of the Fusion Dispatcher within the specified world.
2790  */
2791 pid_t
fusion_dispatcher_tid(const FusionWorld * world)2792 fusion_dispatcher_tid( const FusionWorld *world )
2793 {
2794      D_MAGIC_ASSERT( world, FusionWorld );
2795 
2796      // must return 0 here to not let the dispatcher thread sleep in CoreDFB_CallMode()
2797      return 0;
2798 }
2799 
2800 /*
2801  * Wait until all pending messages are processed.
2802  */
2803 DirectResult
fusion_sync(const FusionWorld * world)2804 fusion_sync( const FusionWorld *world )
2805 {
2806      D_MAGIC_ASSERT( world, FusionWorld );
2807 
2808      return DR_OK;
2809 }
2810 
2811 /* Check if a pointer points to the shared memory. */
2812 bool
fusion_is_shared(FusionWorld * world,const void * ptr)2813 fusion_is_shared( FusionWorld *world,
2814                   const void  *ptr )
2815 {
2816      D_MAGIC_ASSERT( world, FusionWorld );
2817 
2818      return true;
2819 }
2820 
2821 const char *
fusion_get_tmpfs(FusionWorld * world)2822 fusion_get_tmpfs( FusionWorld *world )
2823 {
2824      D_MAGIC_ASSERT( world, FusionWorld );
2825      D_MAGIC_ASSERT( world->shared, FusionWorldShared );
2826 
2827      return "/tmp";
2828 }
2829 
2830 DirectResult
fusion_get_fusionee_path(const FusionWorld * world,FusionID fusion_id,char * buf,size_t buf_size,size_t * ret_size)2831 fusion_get_fusionee_path( const FusionWorld *world,
2832                           FusionID           fusion_id,
2833                           char              *buf,
2834                           size_t             buf_size,
2835                           size_t            *ret_size )
2836 {
2837      D_UNIMPLEMENTED();
2838 
2839      return DR_UNIMPLEMENTED;
2840 }
2841 
2842 #endif
2843 
2844