1 /*
2    (c) Copyright 2001-2009  The world wide DirectFB Open Source Community (directfb.org)
3    (c) Copyright 2000-2004  Convergence (integrated media) GmbH
4 
5    All rights reserved.
6 
7    Written by Denis Oliver Kropp <dok@directfb.org>,
8               Andreas Hundt <andi@fischlustig.de>,
9               Sven Neumann <neo@directfb.org>,
10               Ville Syrjälä <syrjala@sci.fi> and
11               Claudio Ciccani <klan@users.sf.net>.
12 
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 2 of the License, or (at your option) any later version.
17 
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22 
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, write to the
25    Free Software Foundation, Inc., 59 Temple Place - Suite 330,
26    Boston, MA 02111-1307, USA.
27 */
28 
29 #include <config.h>
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include <unistd.h>
33 #include <errno.h>
34 #include <signal.h>
35 
36 #include <sys/param.h>
37 
38 #include <pthread.h>
39 
40 #include <fusion/build.h>
41 
42 #include <direct/debug.h>
43 #include <direct/list.h>
44 #include <direct/mem.h>
45 #include <direct/messages.h>
46 #include <direct/thread.h>
47 #include <direct/trace.h>
48 #include <direct/util.h>
49 
50 #include <fusion/types.h>
51 #include <fusion/lock.h>
52 #include <fusion/shmalloc.h>
53 #include <fusion/reactor.h>
54 
55 #include "fusion_internal.h"
56 
57 
58 #if FUSION_BUILD_MULTI
59 
60 D_DEBUG_DOMAIN( Fusion_Reactor, "Fusion/Reactor", "Fusion's Reactor" );
61 
62 struct __Fusion_FusionReactor {
63      int                magic;
64 
65      int                id;        /* reactor id                          */
66      int                msg_size;  /* size of each message                */
67      bool               direct;
68      bool               destroyed;
69 
70      DirectLink        *globals;
71      FusionSkirmish    *globals_lock;
72 
73      FusionWorldShared *shared;
74 
75 #if !FUSION_BUILD_KERNEL
76      DirectLink        *listeners;  /* list of attached listeners */
77      FusionSkirmish     listeners_lock;
78 
79      FusionCall        *call;
80 #endif
81 };
82 
83 typedef struct {
84      DirectLink         link;
85 
86      int                magic;
87 
88      pthread_rwlock_t   lock;
89 
90      int                reactor_id;
91      FusionReactor     *reactor;
92 
93      DirectLink        *links; /* reactor listeners attached to node  */
94 } ReactorNode;
95 
96 typedef struct {
97      DirectLink         link;
98 
99      int                magic;
100 
101      Reaction          *reaction;
102      int                channel;
103 } NodeLink;
104 
105 /**************************************************************************************************/
106 
107 static ReactorNode *lock_node      ( int                 reactor_id,
108                                      bool                add_it,
109                                      bool                wlock,
110                                      FusionReactor      *reactor, /* one of reactor and world must not be NULL */
111                                      FusionWorld        *world );
112 
113 static void         unlock_node    ( ReactorNode        *node );
114 
115 static void         process_globals( FusionReactor      *reactor,
116                                      const void         *msg_data,
117                                      const ReactionFunc *globals );
118 
119 /**************************************************************************************************/
120 
121 #if FUSION_BUILD_KERNEL
122 
123 FusionReactor *
fusion_reactor_new(int msg_size,const char * name,const FusionWorld * world)124 fusion_reactor_new( int                msg_size,
125                     const char        *name,
126                     const FusionWorld *world )
127 {
128      FusionEntryInfo    info;
129      FusionReactor     *reactor;
130      FusionWorldShared *shared;
131 
132      D_ASSERT( name != NULL );
133      D_MAGIC_ASSERT( world, FusionWorld );
134 
135      shared = world->shared;
136 
137      D_MAGIC_ASSERT( shared, FusionWorldShared );
138 
139      D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_new( '%s', size %d )\n", name ? : "", msg_size );
140 
141      /* allocate shared reactor data */
142      reactor = SHCALLOC( shared->main_pool, 1, sizeof (FusionReactor) );
143      if (!reactor) {
144           D_OOSHM();
145           return NULL;
146      }
147 
148      /* create a new reactor */
149      while (ioctl( world->fusion_fd, FUSION_REACTOR_NEW, &reactor->id )) {
150           if (errno == EINTR)
151               continue;
152 
153           D_PERROR( "FUSION_REACTOR_NEW" );
154           SHFREE( shared->main_pool, reactor );
155           return NULL;
156      }
157 
158      /* set the static message size, should we make dynamic? (TODO?) */
159      reactor->msg_size = msg_size;
160 
161      /* Set default lock for global reactions. */
162      reactor->globals_lock = &shared->reactor_globals;
163 
164      D_DEBUG_AT( Fusion_Reactor, "  -> new reactor %p [%d] with lock %p [%d]\n",
165                  reactor, reactor->id, reactor->globals_lock, reactor->globals_lock->multi.id );
166 
167      reactor->shared = shared;
168      reactor->direct = true;
169 
170      D_MAGIC_SET( reactor, FusionReactor );
171 
172 
173      info.type = FT_REACTOR;
174      info.id   = reactor->id;
175 
176      direct_snputs( info.name, name, sizeof(info.name) );
177 
178      ioctl( world->fusion_fd, FUSION_ENTRY_SET_INFO, &info );
179 
180      return reactor;
181 }
182 
183 DirectResult
fusion_reactor_destroy(FusionReactor * reactor)184 fusion_reactor_destroy( FusionReactor *reactor )
185 {
186      FusionWorldShared *shared;
187 
188      D_MAGIC_ASSERT( reactor, FusionReactor );
189 
190      shared = reactor->shared;
191 
192      D_MAGIC_ASSERT( shared, FusionWorldShared );
193 
194      D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_destroy( %p [%d] )\n", reactor, reactor->id );
195 
196      D_ASSUME( !reactor->destroyed );
197 
198      if (reactor->destroyed)
199           return DR_DESTROYED;
200 
201      while (ioctl( _fusion_fd( shared ), FUSION_REACTOR_DESTROY, &reactor->id )) {
202           switch (errno) {
203                case EINTR:
204                     continue;
205 
206                case EINVAL:
207                     D_ERROR( "Fusion/Reactor: invalid reactor\n" );
208                     return DR_DESTROYED;
209           }
210 
211           D_PERROR( "FUSION_REACTOR_DESTROY" );
212           return DR_FUSION;
213      }
214 
215      reactor->destroyed = true;
216 
217      return DR_OK;
218 }
219 
220 DirectResult
fusion_reactor_free(FusionReactor * reactor)221 fusion_reactor_free( FusionReactor *reactor )
222 {
223      FusionWorldShared *shared;
224 
225      D_MAGIC_ASSERT( reactor, FusionReactor );
226 
227      shared = reactor->shared;
228 
229      D_MAGIC_ASSERT( shared, FusionWorldShared );
230 
231      D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_free( %p [%d] )\n", reactor, reactor->id );
232 
233      D_MAGIC_CLEAR( reactor );
234 
235 //     D_ASSUME( reactor->destroyed );
236 
237      if (!reactor->destroyed)
238           while (ioctl( _fusion_fd( shared ), FUSION_REACTOR_DESTROY, &reactor->id ) && errno == EINTR);
239 
240      /* free shared reactor data */
241      SHFREE( shared->main_pool, reactor );
242 
243      return DR_OK;
244 }
245 
246 DirectResult
fusion_reactor_attach_channel(FusionReactor * reactor,int channel,ReactionFunc func,void * ctx,Reaction * reaction)247 fusion_reactor_attach_channel( FusionReactor *reactor,
248                                int            channel,
249                                ReactionFunc   func,
250                                void          *ctx,
251                                Reaction      *reaction )
252 {
253      ReactorNode         *node;
254      NodeLink            *link;
255      FusionReactorAttach  attach;
256 
257      D_MAGIC_ASSERT( reactor, FusionReactor );
258      D_ASSERT( func != NULL );
259      D_ASSERT( reaction != NULL );
260 
261      D_DEBUG_AT( Fusion_Reactor,
262                  "fusion_reactor_attach( %p [%d], func %p, ctx %p, reaction %p )\n",
263                  reactor, reactor->id, func, ctx, reaction );
264 
265      link = D_CALLOC( 1, sizeof(NodeLink) );
266      if (!link)
267           return D_OOM();
268 
269      node = lock_node( reactor->id, true, true, reactor, NULL );
270      if (!node) {
271           D_FREE( link );
272           return DR_FUSION;
273      }
274 
275      attach.reactor_id = reactor->id;
276      attach.channel    = channel;
277 
278      while (ioctl( _fusion_fd( reactor->shared ), FUSION_REACTOR_ATTACH, &attach )) {
279           switch (errno) {
280                case EINTR:
281                     continue;
282 
283                case EINVAL:
284                     D_ERROR( "Fusion/Reactor: invalid reactor\n" );
285                     unlock_node( node );
286                     D_FREE( link );
287                     return DR_DESTROYED;
288           }
289 
290           D_PERROR( "FUSION_REACTOR_ATTACH" );
291           unlock_node( node );
292           D_FREE( link );
293           return DR_FUSION;
294      }
295 
296      /* fill out callback information */
297      reaction->func      = func;
298      reaction->ctx       = ctx;
299      reaction->node_link = link;
300 
301      link->reaction = reaction;
302      link->channel  = channel;
303 
304      D_MAGIC_SET( link, NodeLink );
305 
306      /* prepend the reaction to the local reaction list */
307      direct_list_prepend( &node->links, &link->link );
308 
309      unlock_node( node );
310 
311      return DR_OK;
312 }
313 
314 static void
remove_node_link(ReactorNode * node,NodeLink * link)315 remove_node_link( ReactorNode *node,
316                   NodeLink    *link )
317 {
318      D_MAGIC_ASSERT( node, ReactorNode );
319      D_MAGIC_ASSERT( link, NodeLink );
320 
321      D_ASSUME( link->reaction == NULL );
322 
323      direct_list_remove( &node->links, &link->link );
324 
325      D_MAGIC_CLEAR( link );
326 
327      D_FREE( link );
328 }
329 
330 DirectResult
fusion_reactor_detach(FusionReactor * reactor,Reaction * reaction)331 fusion_reactor_detach( FusionReactor *reactor,
332                        Reaction      *reaction )
333 {
334      ReactorNode *node;
335      NodeLink    *link;
336 
337      D_MAGIC_ASSERT( reactor, FusionReactor );
338      D_ASSERT( reaction != NULL );
339 
340      D_DEBUG_AT( Fusion_Reactor,
341                  "fusion_reactor_detach( %p [%d], reaction %p ) <- func %p, ctx %p\n",
342                  reactor, reactor->id, reaction, reaction->func, reaction->ctx );
343 
344      node = lock_node( reactor->id, false, true, reactor, NULL );
345      if (!node) {
346           D_BUG( "node not found" );
347           return DR_BUG;
348      }
349 
350      link = reaction->node_link;
351      D_ASSUME( link != NULL );
352 
353      if (link) {
354           FusionReactorDetach detach;
355 
356           D_ASSERT( link->reaction == reaction );
357 
358           detach.reactor_id = reactor->id;
359           detach.channel    = link->channel;
360 
361           reaction->node_link = NULL;
362 
363           link->reaction = NULL;
364 
365           remove_node_link( node, link );
366 
367           while (ioctl( _fusion_fd( reactor->shared ), FUSION_REACTOR_DETACH, &detach )) {
368                switch (errno) {
369                     case EINTR:
370                          continue;
371 
372                     case EINVAL:
373                          D_ERROR( "Fusion/Reactor: invalid reactor\n" );
374                          unlock_node( node );
375                          return DR_DESTROYED;
376                }
377 
378                D_PERROR( "FUSION_REACTOR_DETACH" );
379                unlock_node( node );
380                return DR_FUSION;
381           }
382      }
383 
384      unlock_node( node );
385 
386      return DR_OK;
387 }
388 
389 DirectResult
fusion_reactor_dispatch_channel(FusionReactor * reactor,int channel,const void * msg_data,int msg_size,bool self,const ReactionFunc * globals)390 fusion_reactor_dispatch_channel( FusionReactor      *reactor,
391                                  int                 channel,
392                                  const void         *msg_data,
393                                  int                 msg_size,
394                                  bool                self,
395                                  const ReactionFunc *globals )
396 {
397      FusionReactorDispatch dispatch;
398 
399      D_MAGIC_ASSERT( reactor, FusionReactor );
400 
401      D_ASSERT( msg_data != NULL );
402 
403      D_DEBUG_AT( Fusion_Reactor,
404                  "fusion_reactor_dispatch( %p [%d], msg_data %p, self %s, globals %p)\n",
405                  reactor, reactor->id, msg_data, self ? "true" : "false", globals );
406 
407      /* Handle global reactions first. */
408      if (reactor->globals) {
409           if (globals)
410                process_globals( reactor, msg_data, globals );
411           else
412                D_ERROR( "Fusion/Reactor: global reactions exist but no "
413                         "globals have been passed to dispatch()\n" );
414      }
415 
416      /* Handle local reactions. */
417      if (self && reactor->direct) {
418           _fusion_reactor_process_message( _fusion_world(reactor->shared), reactor->id, channel, msg_data );
419           self = false;
420      }
421 
422      /* Initialize dispatch data. */
423      dispatch.reactor_id = reactor->id;
424      dispatch.channel    = channel;
425      dispatch.self       = self;
426      dispatch.msg_size   = msg_size;
427      dispatch.msg_data   = msg_data;
428 
429      /* Dispatch the message to handle foreign reactions. */
430      while (ioctl( _fusion_fd( reactor->shared ), FUSION_REACTOR_DISPATCH, &dispatch )) {
431           switch (errno) {
432                case EINTR:
433                     continue;
434 
435                case EINVAL:
436                     D_ERROR( "Fusion/Reactor: invalid reactor\n" );
437                     return DR_DESTROYED;
438           }
439 
440           D_PERROR( "FUSION_REACTOR_DISPATCH" );
441           return DR_FUSION;
442      }
443 
444      return DR_OK;
445 }
446 
447 DirectResult
fusion_reactor_set_dispatch_callback(FusionReactor * reactor,FusionCall * call,void * call_ptr)448 fusion_reactor_set_dispatch_callback( FusionReactor  *reactor,
449                                       FusionCall     *call,
450                                       void           *call_ptr )
451 {
452      FusionReactorSetCallback callback;
453 
454      D_MAGIC_ASSERT( reactor, FusionReactor );
455      D_ASSERT( call != NULL );
456 
457      D_DEBUG_AT( Fusion_Reactor,
458                  "fusion_reactor_set_dispatch_callback( %p [%d], call %p [%d], ptr %p)\n",
459                  reactor, reactor->id, call, call->call_id, call_ptr );
460 
461      /* Fill callback info. */
462      callback.reactor_id = reactor->id;
463      callback.call_id    = call->call_id;
464      callback.call_ptr   = call_ptr;
465 
466      /* Set the dispatch callback. */
467      while (ioctl( _fusion_fd( reactor->shared ), FUSION_REACTOR_SET_DISPATCH_CALLBACK, &callback )) {
468           switch (errno) {
469                case EINTR:
470                     continue;
471 
472                case EINVAL:
473                     D_ERROR( "Fusion/Reactor: invalid reactor\n" );
474                     return DR_DESTROYED;
475           }
476 
477           D_PERROR( "FUSION_REACTOR_SET_DISPATCH_CALLBACK" );
478           return DR_FUSION;
479      }
480 
481      return DR_OK;
482 }
483 
484 DirectResult
fusion_reactor_set_name(FusionReactor * reactor,const char * name)485 fusion_reactor_set_name( FusionReactor *reactor,
486                          const char    *name )
487 {
488      FusionEntryInfo info;
489 
490      D_MAGIC_ASSERT( reactor, FusionReactor );
491      D_ASSERT( name != NULL );
492 
493      D_DEBUG_AT( Fusion_Reactor, "%s( %p, '%s' )\n", __FUNCTION__, reactor, name );
494 
495      /* Initialize reactor info. */
496      info.type = FT_REACTOR;
497      info.id   = reactor->id;
498 
499      /* Put reactor name into info. */
500      direct_snputs( info.name, name, sizeof(info.name) );
501 
502      /* Set the reactor info. */
503      while (ioctl( _fusion_fd( reactor->shared ), FUSION_ENTRY_SET_INFO, &info )) {
504           switch (errno) {
505                case EINTR:
506                     continue;
507 
508                case EINVAL:
509                     D_ERROR( "Fusion/Reactor: invalid reactor\n" );
510                     return DR_IDNOTFOUND;
511           }
512 
513           D_PERROR( "FUSION_ENTRY_SET_INFO( reactor 0x%08x, '%s' )\n", reactor->id, name );
514           return DR_FUSION;
515      }
516 
517      return DR_OK;
518 }
519 
520 DirectResult
fusion_reactor_add_permissions(FusionReactor * reactor,FusionID fusion_id,FusionReactorPermissions reactor_permissions)521 fusion_reactor_add_permissions( FusionReactor            *reactor,
522                                 FusionID                  fusion_id,
523                                 FusionReactorPermissions  reactor_permissions )
524 {
525      FusionEntryPermissions permissions;
526 
527      permissions.type        = FT_REACTOR;
528      permissions.id          = reactor->id;
529      permissions.fusion_id   = fusion_id;
530      permissions.permissions = 0;
531 
532      if (reactor_permissions & FUSION_REACTOR_PERMIT_ATTACH_DETACH) {
533           FUSION_ENTRY_PERMISSIONS_ADD( permissions.permissions, FUSION_REACTOR_ATTACH );
534           FUSION_ENTRY_PERMISSIONS_ADD( permissions.permissions, FUSION_REACTOR_DETACH );
535      }
536 
537      if (reactor_permissions & FUSION_REACTOR_PERMIT_DISPATCH)
538           FUSION_ENTRY_PERMISSIONS_ADD( permissions.permissions, FUSION_REACTOR_DISPATCH );
539 
540      while (ioctl( _fusion_fd( reactor->shared ), FUSION_ENTRY_ADD_PERMISSIONS, &permissions ) < 0) {
541           if (errno != EINTR) {
542                D_PERROR( "Fusion/Reactor: FUSION_ENTRY_ADD_PERMISSIONS( id %d ) failed!\n", reactor->id );
543                return DR_FAILURE;
544           }
545      }
546 
547      return DR_OK;
548 }
549 
550 void
_fusion_reactor_process_message(FusionWorld * world,int reactor_id,int channel,const void * msg_data)551 _fusion_reactor_process_message( FusionWorld *world,
552                                  int          reactor_id,
553                                  int          channel,
554                                  const void  *msg_data )
555 {
556      ReactorNode *node;
557      NodeLink    *link;
558 
559      D_MAGIC_ASSERT( world, FusionWorld );
560      D_ASSERT( msg_data != NULL );
561 
562      D_DEBUG_AT( Fusion_Reactor,
563                  "  _fusion_reactor_process_message( [%d], msg_data %p )\n", reactor_id, msg_data );
564 
565      /* Find the local counter part of the reactor. */
566      node = lock_node( reactor_id, false, false, NULL, world );
567      if (!node)
568           return;
569 
570      D_DEBUG_AT( Fusion_Reactor, "    -> node %p, reactor %p\n", node, node->reactor );
571 
572      D_ASSUME( node->links != NULL );
573 
574      if (!node->links) {
575           D_DEBUG_AT( Fusion_Reactor, "    -> no local reactions!?!\n" );
576           unlock_node( node );
577           return;
578      }
579 
580      direct_list_foreach (link, node->links) {
581           Reaction *reaction;
582 
583           D_MAGIC_ASSERT( link, NodeLink );
584 
585           if (link->channel != channel)
586                continue;
587 
588           reaction = link->reaction;
589           if (!reaction)
590                continue;
591 
592           if (reaction->func( msg_data, reaction->ctx ) == RS_REMOVE) {
593                FusionReactorDetach detach;
594 
595                detach.reactor_id = reactor_id;
596                detach.channel    = channel;
597 
598                D_DEBUG_AT( Fusion_Reactor, "    -> removing %p, func %p, ctx %p\n",
599                            reaction, reaction->func, reaction->ctx );
600 
601                link->reaction = NULL;
602 
603                /* We can't remove the link as we only have read lock, to avoid dead locks. */
604 
605                while (ioctl( world->fusion_fd, FUSION_REACTOR_DETACH, &detach )) {
606                     switch (errno) {
607                          case EINTR:
608                               continue;
609 
610                          case EINVAL:
611                               D_ERROR( "Fusion/Reactor: invalid reactor (DETACH)\n" );
612                               break;
613 
614                          default:
615                               D_PERROR( "FUSION_REACTOR_DETACH" );
616                               break;
617                     }
618 
619                     break;
620                }
621           }
622      }
623 
624      unlock_node( node );
625 }
626 
627 #else /* FUSION_BUILD_KERNEL */
628 
629 typedef struct {
630      DirectLink    link;
631 
632      unsigned int  refs;
633 
634      FusionID      fusion_id;
635      int           channel;
636 } __Listener;
637 
638 
639 FusionReactor *
fusion_reactor_new(int msg_size,const char * name,const FusionWorld * world)640 fusion_reactor_new( int                msg_size,
641                     const char        *name,
642                     const FusionWorld *world )
643 {
644      FusionReactor     *reactor;
645      FusionWorldShared *shared;
646 
647      D_ASSERT( name != NULL );
648      D_MAGIC_ASSERT( world, FusionWorld );
649 
650      shared = world->shared;
651 
652      D_MAGIC_ASSERT( shared, FusionWorldShared );
653 
654      D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_new( '%s', size %d )\n", name ? : "", msg_size );
655 
656      /* allocate shared reactor data */
657      reactor = SHCALLOC( shared->main_pool, 1, sizeof (FusionReactor) );
658      if (!reactor) {
659           D_OOSHM();
660           return NULL;
661      }
662 
663      /* Generate the reactor id */
664      reactor->id = ++shared->reactor_ids;
665 
666      /* Set the static message size, should we make dynamic? (TODO?) */
667      reactor->msg_size = msg_size;
668 
669      /* Set default lock for global reactions. */
670      reactor->globals_lock = &shared->reactor_globals;
671 
672      fusion_skirmish_init( &reactor->listeners_lock, "Reactor Listeners", world );
673 
674      D_DEBUG_AT( Fusion_Reactor, "  -> new reactor %p [%d] with lock %p [%d]\n",
675                  reactor, reactor->id, reactor->globals_lock, reactor->globals_lock->multi.id );
676 
677      reactor->shared = shared;
678      reactor->direct = true;
679 
680      D_MAGIC_SET( reactor, FusionReactor );
681 
682      return reactor;
683 }
684 
685 DirectResult
fusion_reactor_destroy(FusionReactor * reactor)686 fusion_reactor_destroy( FusionReactor *reactor )
687 {
688      FusionWorldShared *shared;
689 
690      D_MAGIC_ASSERT( reactor, FusionReactor );
691 
692      shared = reactor->shared;
693 
694      D_MAGIC_ASSERT( shared, FusionWorldShared );
695 
696      D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_destroy( %p [%d] )\n", reactor, reactor->id );
697 
698      D_ASSUME( !reactor->destroyed );
699 
700      if (reactor->destroyed)
701           return DR_DESTROYED;
702 
703      fusion_skirmish_destroy( &reactor->listeners_lock );
704 
705      reactor->destroyed = true;
706 
707      return DR_OK;
708 }
709 
710 DirectResult
fusion_reactor_free(FusionReactor * reactor)711 fusion_reactor_free( FusionReactor *reactor )
712 {
713      FusionWorldShared *shared;
714      __Listener        *listener, *temp;
715 
716      D_MAGIC_ASSERT( reactor, FusionReactor );
717 
718      shared = reactor->shared;
719 
720      D_MAGIC_ASSERT( shared, FusionWorldShared );
721 
722      D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_free( %p [%d] )\n", reactor, reactor->id );
723 
724      D_MAGIC_CLEAR( reactor );
725 
726 //     D_ASSUME( reactor->destroyed );
727 
728      direct_list_foreach_safe (listener, temp, reactor->listeners) {
729           direct_list_remove( &reactor->listeners, &listener->link );
730           SHFREE( shared->main_pool, listener );
731      }
732 
733      /* free shared reactor data */
734      SHFREE( shared->main_pool, reactor );
735 
736      return DR_OK;
737 }
738 
739 DirectResult
fusion_reactor_attach_channel(FusionReactor * reactor,int channel,ReactionFunc func,void * ctx,Reaction * reaction)740 fusion_reactor_attach_channel( FusionReactor *reactor,
741                                int            channel,
742                                ReactionFunc   func,
743                                void          *ctx,
744                                Reaction      *reaction )
745 {
746      FusionWorldShared *shared;
747      ReactorNode       *node;
748      NodeLink          *link;
749      FusionID           fusion_id;
750      __Listener        *listener;
751 
752      D_MAGIC_ASSERT( reactor, FusionReactor );
753      D_ASSERT( func != NULL );
754      D_ASSERT( reaction != NULL );
755 
756      D_DEBUG_AT( Fusion_Reactor,
757                  "fusion_reactor_attach( %p [%d], func %p, ctx %p, reaction %p )\n",
758                  reactor, reactor->id, func, ctx, reaction );
759 
760      if (reactor->destroyed)
761           return DR_DESTROYED;
762 
763      shared = reactor->shared;
764 
765      link = D_CALLOC( 1, sizeof(NodeLink) );
766      if (!link)
767           return D_OOM();
768 
769      node = lock_node( reactor->id, true, true, reactor, NULL );
770      if (!node) {
771           D_FREE( link );
772           return DR_FUSION;
773      }
774 
775      fusion_id = _fusion_id( shared );
776 
777      fusion_skirmish_prevail( &reactor->listeners_lock );
778 
779      direct_list_foreach (listener, reactor->listeners) {
780           if (listener->fusion_id == fusion_id && listener->channel == channel) {
781                listener->refs++;
782                break;
783           }
784      }
785 
786      if (!listener) {
787           listener = SHCALLOC( shared->main_pool, 1, sizeof(__Listener) );
788           if (!listener) {
789                D_OOSHM();
790                fusion_skirmish_dismiss( &reactor->listeners_lock );
791                unlock_node( node );
792                D_FREE( link );
793                return DR_NOSHAREDMEMORY;
794           }
795 
796           listener->refs      = 1;
797           listener->fusion_id = fusion_id;
798           listener->channel   = channel;
799 
800           direct_list_append( &reactor->listeners, &listener->link );
801      }
802 
803      fusion_skirmish_dismiss( &reactor->listeners_lock );
804 
805      /* fill out callback information */
806      reaction->func      = func;
807      reaction->ctx       = ctx;
808      reaction->node_link = link;
809 
810      link->reaction = reaction;
811      link->channel  = channel;
812 
813      D_MAGIC_SET( link, NodeLink );
814 
815      /* prepend the reaction to the local reaction list */
816      direct_list_prepend( &node->links, &link->link );
817 
818      unlock_node( node );
819 
820      return DR_OK;
821 }
822 
823 static void
remove_node_link(ReactorNode * node,NodeLink * link)824 remove_node_link( ReactorNode *node,
825                   NodeLink    *link )
826 {
827      D_MAGIC_ASSERT( node, ReactorNode );
828      D_MAGIC_ASSERT( link, NodeLink );
829 
830      D_ASSUME( link->reaction == NULL );
831 
832      direct_list_remove( &node->links, &link->link );
833 
834      D_MAGIC_CLEAR( link );
835 
836      D_FREE( link );
837 }
838 
839 DirectResult
fusion_reactor_detach(FusionReactor * reactor,Reaction * reaction)840 fusion_reactor_detach( FusionReactor *reactor,
841                        Reaction      *reaction )
842 {
843      FusionWorldShared *shared;
844      ReactorNode       *node;
845      NodeLink          *link;
846 
847      D_MAGIC_ASSERT( reactor, FusionReactor );
848      D_ASSERT( reaction != NULL );
849 
850      D_DEBUG_AT( Fusion_Reactor,
851                  "fusion_reactor_detach( %p [%d], reaction %p ) <- func %p, ctx %p\n",
852                  reactor, reactor->id, reaction, reaction->func, reaction->ctx );
853 
854      if (reactor->destroyed)
855           return DR_DESTROYED;
856 
857      shared = reactor->shared;
858 
859      node = lock_node( reactor->id, false, true, reactor, NULL );
860      if (!node) {
861           D_BUG( "node not found" );
862           return DR_BUG;
863      }
864 
865      link = reaction->node_link;
866      D_ASSUME( link != NULL );
867 
868      if (link) {
869           __Listener *listener;
870           FusionID    fusion_id = _fusion_id( shared );
871 
872           D_ASSERT( link->reaction == reaction );
873 
874           reaction->node_link = NULL;
875 
876           link->reaction = NULL;
877 
878           remove_node_link( node, link );
879 
880           fusion_skirmish_prevail( &reactor->listeners_lock );
881 
882           direct_list_foreach (listener, reactor->listeners) {
883                if (listener->fusion_id == fusion_id && listener->channel == link->channel) {
884                     if (--listener->refs == 0) {
885                          direct_list_remove( &reactor->listeners, &listener->link );
886                          SHFREE( shared->main_pool, listener );
887                     }
888                     break;
889                }
890           }
891 
892           fusion_skirmish_dismiss( &reactor->listeners_lock );
893 
894           if (!listener)
895                D_ERROR( "Fusion/Reactor: Couldn't detach listener!\n" );
896      }
897 
898      unlock_node( node );
899 
900      return DR_OK;
901 }
902 
903 DirectResult
fusion_reactor_dispatch_channel(FusionReactor * reactor,int channel,const void * msg_data,int msg_size,bool self,const ReactionFunc * globals)904 fusion_reactor_dispatch_channel( FusionReactor      *reactor,
905                                  int                 channel,
906                                  const void         *msg_data,
907                                  int                 msg_size,
908                                  bool                self,
909                                  const ReactionFunc *globals )
910 {
911      FusionWorld           *world;
912      __Listener            *listener, *temp;
913      FusionRef             *ref = NULL;
914      FusionReactorMessage  *msg;
915      struct sockaddr_un     addr;
916      int                    len;
917 
918      D_MAGIC_ASSERT( reactor, FusionReactor );
919 
920      D_ASSERT( msg_data != NULL );
921 
922      D_DEBUG_AT( Fusion_Reactor,
923                  "fusion_reactor_dispatch( %p [%d], msg_data %p, self %s, globals %p)\n",
924                  reactor, reactor->id, msg_data, self ? "true" : "false", globals );
925 
926      if (reactor->destroyed)
927           return DR_DESTROYED;
928 
929      if (msg_size > FUSION_MESSAGE_SIZE-sizeof(FusionReactorMessage)) {
930           D_ERROR( "Fusion/Reactor: Message too large (%d)!\n", msg_size );
931           return DR_UNSUPPORTED;
932      }
933 
934      world = _fusion_world( reactor->shared );
935 
936      if (reactor->call) {
937           ref = SHMALLOC( world->shared->main_pool, sizeof(FusionRef) );
938           if (!ref)
939                return D_OOSHM();
940 
941           fusion_ref_init( ref, "Dispatch Ref", world );
942           fusion_ref_up( ref, true );
943           fusion_ref_watch( ref, reactor->call, 0 );
944      }
945 
946      /* Handle global reactions first. */
947      if (reactor->globals) {
948           if (globals)
949                process_globals( reactor, msg_data, globals );
950           else
951                D_ERROR( "Fusion/Reactor: global reactions exist but no "
952                         "globals have been passed to dispatch()\n" );
953      }
954 
955      /* Handle local reactions. */
956      if (self && reactor->direct) {
957           _fusion_reactor_process_message( _fusion_world(reactor->shared), reactor->id, channel, msg_data );
958           self = false;
959      }
960 
961      msg = alloca( sizeof(FusionReactorMessage) + msg_size );
962 
963      msg->type    = FMT_REACTOR;
964      msg->id      = reactor->id;
965      msg->channel = channel;
966      msg->ref     = ref;
967 
968      memcpy( (void*)msg + sizeof(FusionReactorMessage), msg_data, msg_size );
969 
970      addr.sun_family = AF_UNIX;
971      len = snprintf( addr.sun_path, sizeof(addr.sun_path),
972                      "/tmp/.fusion-%d/", fusion_world_index( world ) );
973 
974      fusion_skirmish_prevail( &reactor->listeners_lock );
975 
976      direct_list_foreach_safe (listener, temp, reactor->listeners) {
977           if (listener->channel == channel) {
978                DirectResult ret;
979 
980                if (!self && listener->fusion_id == world->fusion_id)
981                     continue;
982 
983                if (ref)
984                     fusion_ref_up( ref, true );
985 
986                snprintf( addr.sun_path+len, sizeof(addr.sun_path)-len, "%lx", listener->fusion_id );
987 
988                D_DEBUG_AT( Fusion_Reactor, " -> sending to '%s'\n", addr.sun_path );
989 
990                ret = _fusion_send_message( world->fusion_fd, msg, sizeof(FusionReactorMessage)+msg_size, &addr );
991                if (ret == DR_FUSION) {
992                     D_DEBUG_AT( Fusion_Reactor, " -> removing dead listener %lu\n", listener->fusion_id );
993 
994                     if (ref)
995                          fusion_ref_down( ref, true );
996 
997                     direct_list_remove( &reactor->listeners, &listener->link );
998 
999                     SHFREE( reactor->shared->main_pool, listener );
1000                }
1001           }
1002      }
1003 
1004      fusion_skirmish_dismiss( &reactor->listeners_lock );
1005 
1006      if (ref) {
1007           fusion_ref_down( ref, true );
1008           if (fusion_ref_zero_trylock( ref ) == DR_OK) {
1009                fusion_ref_destroy( ref );
1010                SHFREE( world->shared->main_pool, ref );
1011           }
1012      }
1013 
1014      D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_dispatch( %p ) done.\n", reactor );
1015 
1016      return DR_OK;
1017 }
1018 
1019 DirectResult
fusion_reactor_set_dispatch_callback(FusionReactor * reactor,FusionCall * call,void * call_ptr)1020 fusion_reactor_set_dispatch_callback( FusionReactor  *reactor,
1021                                       FusionCall     *call,
1022                                       void           *call_ptr )
1023 {
1024      D_MAGIC_ASSERT( reactor, FusionReactor );
1025      D_ASSERT( call != NULL );
1026 
1027      D_DEBUG_AT( Fusion_Reactor,
1028                  "fusion_reactor_set_dispatch_callback( %p [%d], call %p [%d], ptr %p)\n",
1029                  reactor, reactor->id, call, call->call_id, call_ptr );
1030 
1031      if (reactor->destroyed)
1032           return DR_DESTROYED;
1033 
1034      if (call_ptr)
1035           return DR_UNIMPLEMENTED;
1036 
1037      reactor->call = call;
1038 
1039      return DR_OK;
1040 }
1041 
1042 DirectResult
fusion_reactor_set_name(FusionReactor * reactor,const char * name)1043 fusion_reactor_set_name( FusionReactor *reactor,
1044                          const char    *name )
1045 {
1046      D_UNIMPLEMENTED();
1047 
1048      return DR_UNIMPLEMENTED;
1049 }
1050 
1051 DirectResult
fusion_reactor_add_permissions(FusionReactor * reactor,FusionID fusion_id,FusionReactorPermissions permissions)1052 fusion_reactor_add_permissions( FusionReactor            *reactor,
1053                                 FusionID                  fusion_id,
1054                                 FusionReactorPermissions  permissions )
1055 {
1056      D_UNIMPLEMENTED();
1057 
1058      return DR_UNIMPLEMENTED;
1059 }
1060 
1061 void
_fusion_reactor_process_message(FusionWorld * world,int reactor_id,int channel,const void * msg_data)1062 _fusion_reactor_process_message( FusionWorld *world,
1063                                  int          reactor_id,
1064                                  int          channel,
1065                                  const void  *msg_data )
1066 {
1067      ReactorNode *node;
1068      NodeLink    *link;
1069 
1070      D_MAGIC_ASSERT( world, FusionWorld );
1071      D_ASSERT( msg_data != NULL );
1072 
1073      D_DEBUG_AT( Fusion_Reactor,
1074                  "  _fusion_reactor_process_message( [%d], msg_data %p )\n", reactor_id, msg_data );
1075 
1076      /* Find the local counter part of the reactor. */
1077      node = lock_node( reactor_id, false, false, NULL, world );
1078      if (!node)
1079           return;
1080 
1081      D_DEBUG_AT( Fusion_Reactor, "    -> node %p, reactor %p\n", node, node->reactor );
1082 
1083      D_ASSUME( node->links != NULL );
1084 
1085      if (!node->links) {
1086           D_DEBUG_AT( Fusion_Reactor, "    -> no local reactions!?!\n" );
1087           unlock_node( node );
1088           return;
1089      }
1090 
1091      direct_list_foreach (link, node->links) {
1092           Reaction *reaction;
1093 
1094           D_MAGIC_ASSERT( link, NodeLink );
1095 
1096           if (link->channel != channel)
1097                continue;
1098 
1099           reaction = link->reaction;
1100           if (!reaction)
1101                continue;
1102 
1103           if (reaction->func( msg_data, reaction->ctx ) == RS_REMOVE) {
1104                FusionReactor *reactor = node->reactor;
1105                __Listener    *listener;
1106 
1107                D_DEBUG_AT( Fusion_Reactor, "    -> removing %p, func %p, ctx %p\n",
1108                            reaction, reaction->func, reaction->ctx );
1109 
1110                fusion_skirmish_prevail( &reactor->listeners_lock );
1111 
1112                direct_list_foreach (listener, reactor->listeners) {
1113                     if (listener->fusion_id == world->fusion_id && listener->channel == channel) {
1114                          if (--listener->refs == 0) {
1115                               direct_list_remove( &reactor->listeners, &listener->link );
1116                               SHFREE( world->shared->main_pool, listener );
1117                          }
1118                          break;
1119                     }
1120                }
1121 
1122                fusion_skirmish_dismiss( &reactor->listeners_lock );
1123           }
1124      }
1125 
1126      unlock_node( node );
1127 }
1128 
1129 #endif /* FUSION_BUILD_KERNEL */
1130 
1131 
1132 DirectResult
fusion_reactor_set_lock(FusionReactor * reactor,FusionSkirmish * lock)1133 fusion_reactor_set_lock( FusionReactor  *reactor,
1134                          FusionSkirmish *lock )
1135 {
1136      DirectResult    ret;
1137      FusionSkirmish *old;
1138 
1139      D_MAGIC_ASSERT( reactor, FusionReactor );
1140 
1141      old = reactor->globals_lock;
1142 
1143      D_ASSERT( lock != NULL );
1144      D_ASSERT( old != NULL );
1145 
1146      D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_set_lock( %p [%d], lock %p [%d] ) <- old %p [%d]\n",
1147                  reactor, reactor->id, lock, lock->multi.id, old, old->multi.id );
1148 
1149      /*
1150       * Acquire the old lock to make sure that changing the lock doesn't
1151       * result in mismatching lock/unlock pairs in other functions.
1152       */
1153      ret = fusion_skirmish_prevail( old );
1154      if (ret)
1155           return ret;
1156 
1157      D_ASSUME( reactor->globals_lock != lock );
1158 
1159      /* Set the lock replacement. */
1160      reactor->globals_lock = lock;
1161 
1162      /* Release the old lock which is obsolete now. */
1163      fusion_skirmish_dismiss( old );
1164 
1165      return DR_OK;
1166 }
1167 
1168 DirectResult
fusion_reactor_set_lock_only(FusionReactor * reactor,FusionSkirmish * lock)1169 fusion_reactor_set_lock_only( FusionReactor  *reactor,
1170                               FusionSkirmish *lock )
1171 {
1172      D_MAGIC_ASSERT( reactor, FusionReactor );
1173      D_ASSERT( lock != NULL );
1174 
1175      D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_set_lock_only( %p [%d], lock %p [%d] ) <- old %p [%d]\n",
1176                  reactor, reactor->id, lock, lock->multi.id, reactor->globals_lock, reactor->globals_lock->multi.id );
1177 
1178      D_ASSUME( reactor->globals_lock != lock );
1179 
1180      /* Set the lock replacement. */
1181      reactor->globals_lock = lock;
1182 
1183      return DR_OK;
1184 }
1185 
1186 DirectResult
fusion_reactor_attach(FusionReactor * reactor,ReactionFunc func,void * ctx,Reaction * reaction)1187 fusion_reactor_attach (FusionReactor *reactor,
1188                        ReactionFunc   func,
1189                        void          *ctx,
1190                        Reaction      *reaction)
1191 {
1192      D_MAGIC_ASSERT( reactor, FusionReactor );
1193      D_ASSERT( func != NULL );
1194      D_ASSERT( reaction != NULL );
1195 
1196      return fusion_reactor_attach_channel( reactor, 0, func, ctx, reaction );
1197 }
1198 
1199 DirectResult
fusion_reactor_attach_global(FusionReactor * reactor,int index,void * ctx,GlobalReaction * reaction)1200 fusion_reactor_attach_global( FusionReactor  *reactor,
1201                               int             index,
1202                               void           *ctx,
1203                               GlobalReaction *reaction )
1204 {
1205      DirectResult    ret;
1206      FusionSkirmish *lock;
1207 
1208      D_MAGIC_ASSERT( reactor, FusionReactor );
1209 
1210      D_ASSERT( index >= 0 );
1211      D_ASSERT( reaction != NULL );
1212 
1213      D_DEBUG_AT( Fusion_Reactor,
1214                  "fusion_reactor_attach_global( %p [%d], index %d, ctx %p, reaction %p )\n",
1215                  reactor, reactor->id, index, ctx, reaction );
1216 
1217      /* Initialize reaction data. */
1218      reaction->index    = index;
1219      reaction->ctx      = ctx;
1220      reaction->attached = true;
1221 
1222      /* Remember for safety. */
1223      lock = reactor->globals_lock;
1224 
1225      D_ASSERT( lock != NULL );
1226 
1227      /* Lock the list of global reactions. */
1228      ret = fusion_skirmish_prevail( lock );
1229      if (ret)
1230           return ret;
1231 
1232      /* FIXME: Might have changed while waiting for the lock. */
1233      if (lock != reactor->globals_lock)
1234           D_WARN( "using old lock once more" );
1235 
1236      /* Prepend the reaction to the list. */
1237      direct_list_prepend( &reactor->globals, &reaction->link );
1238 
1239      /* Unlock the list of global reactions. */
1240      fusion_skirmish_dismiss( lock );
1241 
1242      return DR_OK;
1243 }
1244 
1245 DirectResult
fusion_reactor_detach_global(FusionReactor * reactor,GlobalReaction * reaction)1246 fusion_reactor_detach_global( FusionReactor  *reactor,
1247                               GlobalReaction *reaction )
1248 {
1249      DirectResult    ret;
1250      FusionSkirmish *lock;
1251 
1252      D_MAGIC_ASSERT( reactor, FusionReactor );
1253 
1254      D_ASSERT( reaction != NULL );
1255 
1256      D_DEBUG_AT( Fusion_Reactor,
1257                  "fusion_reactor_detach_global( %p [%d], reaction %p ) <- index %d, ctx %p\n",
1258                  reactor, reactor->id, reaction, reaction->index, reaction->ctx );
1259 
1260      /* Remember for safety. */
1261      lock = reactor->globals_lock;
1262 
1263      D_ASSERT( lock != NULL );
1264 
1265      /* Lock the list of global reactions. */
1266      ret = fusion_skirmish_prevail( lock );
1267      if (ret)
1268           return ret;
1269 
1270      /* FIXME: Might have changed while waiting for the lock. */
1271      if (lock != reactor->globals_lock)
1272           D_WARN( "using old lock once more" );
1273 
1274      // Do not assume the reaction is attached, because it is possible that it may not be.
1275      //D_ASSUME( reaction->attached );
1276 
1277      /* Check against multiple detach. */
1278      if (reaction->attached) {
1279           /* Mark as detached. */
1280           reaction->attached = false;
1281 
1282           /* Remove the reaction from the list. */
1283           direct_list_remove( &reactor->globals, &reaction->link );
1284      }
1285 
1286      /* Unlock the list of global reactions. */
1287      fusion_skirmish_dismiss( lock );
1288 
1289      return DR_OK;
1290 }
1291 
1292 DirectResult
fusion_reactor_dispatch(FusionReactor * reactor,const void * msg_data,bool self,const ReactionFunc * globals)1293 fusion_reactor_dispatch( FusionReactor      *reactor,
1294                          const void         *msg_data,
1295                          bool                self,
1296                          const ReactionFunc *globals )
1297 {
1298      D_MAGIC_ASSERT( reactor, FusionReactor );
1299 
1300      return fusion_reactor_dispatch_channel( reactor, 0, msg_data, reactor->msg_size, self, globals );
1301 }
1302 
1303 DirectResult
fusion_reactor_sized_dispatch(FusionReactor * reactor,const void * msg_data,int msg_size,bool self,const ReactionFunc * globals)1304 fusion_reactor_sized_dispatch( FusionReactor      *reactor,
1305                                const void         *msg_data,
1306                                int                 msg_size,
1307                                bool                self,
1308                                const ReactionFunc *globals )
1309 {
1310      D_MAGIC_ASSERT( reactor, FusionReactor );
1311 
1312      return fusion_reactor_dispatch_channel( reactor, 0, msg_data, msg_size, self, globals );
1313 }
1314 
1315 DirectResult
fusion_reactor_direct(FusionReactor * reactor,bool direct)1316 fusion_reactor_direct( FusionReactor *reactor, bool direct )
1317 {
1318      D_MAGIC_ASSERT( reactor, FusionReactor );
1319 
1320      reactor->direct = direct;
1321 
1322      return DR_OK;
1323 }
1324 
1325 
1326 void
_fusion_reactor_free_all(FusionWorld * world)1327 _fusion_reactor_free_all( FusionWorld *world )
1328 {
1329      ReactorNode *node, *node_temp;
1330 
1331      D_MAGIC_ASSERT( world, FusionWorld );
1332 
1333      D_DEBUG_AT( Fusion_Reactor, "_fusion_reactor_free_all() <- nodes %p\n", world->reactor_nodes );
1334 
1335 
1336      direct_mutex_lock( &world->reactor_nodes_lock );
1337 
1338      direct_list_foreach_safe (node, node_temp, world->reactor_nodes) {
1339           NodeLink *link, *link_temp;
1340 
1341           D_MAGIC_ASSERT( node, ReactorNode );
1342 
1343           pthread_rwlock_wrlock( &node->lock );
1344 
1345           direct_list_foreach_safe (link, link_temp, node->links) {
1346                D_MAGIC_ASSERT( link, NodeLink );
1347 
1348                D_MAGIC_CLEAR( link );
1349 
1350                D_FREE( link );
1351           }
1352 
1353           pthread_rwlock_unlock( &node->lock );
1354           pthread_rwlock_destroy( &node->lock );
1355 
1356           D_MAGIC_CLEAR( node );
1357 
1358           D_FREE( node );
1359      }
1360 
1361      world->reactor_nodes = NULL;
1362 
1363      direct_mutex_unlock( &world->reactor_nodes_lock );
1364 }
1365 
1366 static void
process_globals(FusionReactor * reactor,const void * msg_data,const ReactionFunc * globals)1367 process_globals( FusionReactor      *reactor,
1368                  const void         *msg_data,
1369                  const ReactionFunc *globals )
1370 {
1371      DirectLink     *n;
1372      GlobalReaction *global;
1373      FusionSkirmish *lock;
1374      int             max_index = -1;
1375 
1376      D_MAGIC_ASSERT( reactor, FusionReactor );
1377 
1378      D_ASSERT( msg_data != NULL );
1379      D_ASSERT( globals != NULL );
1380 
1381 /*     D_DEBUG_AT( Fusion_Reactor, "  process_globals( %p [%d], msg_data %p, globals %p )\n",
1382                  reactor, reactor->id, msg_data, globals );*/
1383 
1384      /* Find maximum reaction index. */
1385      while (globals[max_index+1])
1386           max_index++;
1387 
1388      if (max_index < 0)
1389           return;
1390 
1391      /* Remember for safety. */
1392      lock = reactor->globals_lock;
1393 
1394      D_ASSERT( lock != NULL );
1395 
1396      /* Lock the list of global reactions. */
1397      if (fusion_skirmish_prevail( lock ))
1398           return;
1399 
1400      /* FIXME: Might have changed while waiting for the lock. */
1401      if (lock != reactor->globals_lock)
1402           D_WARN( "using old lock once more" );
1403 
1404      /* Loop through all global reactions. */
1405      direct_list_foreach_safe (global, n, reactor->globals) {
1406           int index = global->index;
1407 
1408           /* Check if the index is valid. */
1409           if (index < 0 || index > max_index) {
1410                D_WARN( "index out of bounds (%d/%d)", global->index, max_index );
1411                continue;
1412           }
1413 
1414           /* Call reaction and remove it if requested. */
1415           if (globals[global->index]( msg_data, global->ctx ) == RS_REMOVE) {
1416                /*D_DEBUG_AT( Fusion_Reactor, "    -> removing %p, index %d, ctx %p\n",
1417                            global, global->index, global->ctx );*/
1418 
1419                // Mark as detached, since the global reaction is being removed
1420                // from the global reaction list.
1421                global->attached = false;
1422 
1423                direct_list_remove( &reactor->globals, &global->link );
1424           }
1425      }
1426 
1427      /* Unlock the list of global reactions. */
1428      fusion_skirmish_dismiss( lock );
1429 }
1430 
1431 
1432 
1433 /*****************************
1434  *  File internal functions  *
1435  *****************************/
1436 
1437 static ReactorNode *
lock_node(int reactor_id,bool add_it,bool wlock,FusionReactor * reactor,FusionWorld * world)1438 lock_node( int reactor_id, bool add_it, bool wlock, FusionReactor *reactor, FusionWorld *world )
1439 {
1440      DirectLink        *n;
1441      ReactorNode       *node;
1442      FusionWorldShared *shared;
1443 
1444      D_DEBUG_AT( Fusion_Reactor, "    lock_node( [%d], add %s, reactor %p )\n",
1445                  reactor_id, add_it ? "true" : "false", reactor );
1446 
1447      D_ASSERT( reactor != NULL || (!add_it && world != NULL) );
1448 
1449      if (reactor) {
1450           D_MAGIC_ASSERT( reactor, FusionReactor );
1451 
1452           shared = reactor->shared;
1453 
1454           D_MAGIC_ASSERT( shared, FusionWorldShared );
1455 
1456           world = _fusion_world( shared );
1457 
1458           D_MAGIC_ASSERT( world, FusionWorld );
1459      }
1460      else {
1461           D_MAGIC_ASSERT( world, FusionWorld );
1462 
1463           shared = world->shared;
1464 
1465           D_MAGIC_ASSERT( shared, FusionWorldShared );
1466      }
1467 
1468 
1469      direct_mutex_lock( &world->reactor_nodes_lock );
1470 
1471      direct_list_foreach_safe (node, n, world->reactor_nodes) {
1472           D_MAGIC_ASSERT( node, ReactorNode );
1473 
1474           if (node->reactor_id == reactor_id) {
1475                if (wlock) {
1476                     DirectLink *n;
1477                     NodeLink   *link;
1478 
1479                     pthread_rwlock_wrlock( &node->lock );
1480 
1481                     /* FIXME: don't cleanup asynchronously */
1482                     direct_list_foreach_safe (link, n, node->links) {
1483                          D_MAGIC_ASSERT( link, NodeLink );
1484 
1485                          if (!link->reaction) {
1486                               D_DEBUG_AT( Fusion_Reactor, "    -> cleaning up %p\n", link );
1487 
1488                               remove_node_link( node, link );
1489                          }
1490                          else
1491                               D_ASSERT( link->reaction->node_link == link );
1492                     }
1493                }
1494                else
1495                     pthread_rwlock_rdlock( &node->lock );
1496 
1497                /* FIXME: Don't cleanup asynchronously. */
1498                if (!node->links && !add_it) {
1499 //                    D_DEBUG_AT( Fusion_Reactor, "      -> cleaning up mine %p\n", node );
1500 
1501                     direct_list_remove( &world->reactor_nodes, &node->link );
1502 
1503                     pthread_rwlock_unlock( &node->lock );
1504                     pthread_rwlock_destroy( &node->lock );
1505 
1506                     D_MAGIC_CLEAR( node );
1507 
1508                     D_FREE( node );
1509 
1510                     node = NULL;
1511                }
1512                else {
1513 /*                    D_DEBUG_AT( Fusion_Reactor, "      -> found %p (%d reactions)\n",
1514                                 node, direct_list_count_elements_EXPENSIVE( node->reactions ) );*/
1515 
1516                     D_ASSERT( node->reactor == reactor || reactor == NULL );
1517 
1518                     direct_list_move_to_front( &world->reactor_nodes, &node->link );
1519                }
1520 
1521                direct_mutex_unlock( &world->reactor_nodes_lock );
1522 
1523                return node;
1524           }
1525 
1526           /* FIXME: Don't cleanup asynchronously. */
1527           if (!pthread_rwlock_trywrlock( &node->lock )) {
1528                if (!node->links) {
1529 //                    D_DEBUG_AT( Fusion_Reactor, "      -> cleaning up other %p\n", node );
1530 
1531                     direct_list_remove( &world->reactor_nodes, &node->link );
1532 
1533                     pthread_rwlock_unlock( &node->lock );
1534                     pthread_rwlock_destroy( &node->lock );
1535 
1536                     D_MAGIC_CLEAR( node );
1537 
1538                     D_FREE( node );
1539                }
1540                else {
1541                     /*D_DEBUG_AT( Fusion_Reactor, "      -> keeping other %p (%d reactions)\n",
1542                                 node, direct_list_count_elements_EXPENSIVE( node->reactions ) );*/
1543 
1544                     pthread_rwlock_unlock( &node->lock );
1545                }
1546           }
1547      }
1548 
1549 //     D_DEBUG_AT( Fusion_Reactor, "      -> not found%s adding\n", add_it ? ", but" : " and not" );
1550 
1551      if (add_it) {
1552           D_MAGIC_ASSERT( reactor, FusionReactor );
1553 
1554           node = D_CALLOC( 1, sizeof(ReactorNode) );
1555           if (!node) {
1556                D_OOM();
1557                return NULL;
1558           }
1559 
1560           //direct_util_recursive_pthread_mutex_init( &node->lock );
1561           pthread_rwlock_init( &node->lock, NULL );
1562 
1563 
1564           if (wlock)
1565                pthread_rwlock_wrlock( &node->lock );
1566           else
1567                pthread_rwlock_rdlock( &node->lock );
1568 
1569           node->reactor_id = reactor_id;
1570           node->reactor    = reactor;
1571 
1572           D_MAGIC_SET( node, ReactorNode );
1573 
1574           direct_list_prepend( &world->reactor_nodes, &node->link );
1575 
1576           direct_mutex_unlock( &world->reactor_nodes_lock );
1577 
1578           return node;
1579      }
1580 
1581      direct_mutex_unlock( &world->reactor_nodes_lock );
1582 
1583      return NULL;
1584 }
1585 
1586 static void
unlock_node(ReactorNode * node)1587 unlock_node( ReactorNode *node )
1588 {
1589      D_ASSERT( node != NULL );
1590 
1591 //     D_MAGIC_ASSERT( node->reactor, FusionReactor );
1592 
1593 /*     D_DEBUG_AT( Fusion_Reactor, "    unlock_node( %p, reactor %p [%d] )\n",
1594                  node, node->reactor, node->reactor->id );*/
1595 
1596      pthread_rwlock_unlock( &node->lock );
1597 }
1598 
1599 #else /* FUSION_BUILD_MULTI */
1600 
1601 /***************************
1602  *  Internal declarations  *
1603  ***************************/
1604 
1605 /*
1606  *
1607  */
1608 struct __Fusion_FusionReactor {
1609      DirectLink       *reactions; /* reactor listeners attached to node  */
1610      pthread_mutex_t   reactions_lock;
1611 
1612      DirectLink       *globals; /* global reactions attached to node  */
1613      pthread_mutex_t   globals_lock;
1614 
1615      bool              destroyed;
1616 
1617      int               msg_size;
1618 };
1619 
1620 static void
1621 process_globals( FusionReactor      *reactor,
1622                  const void         *msg_data,
1623                  const ReactionFunc *globals );
1624 
1625 /****************
1626  *  Public API  *
1627  ****************/
1628 
1629 FusionReactor *
fusion_reactor_new(int msg_size,const char * name,const FusionWorld * world)1630 fusion_reactor_new( int                msg_size,
1631                     const char        *name,
1632                     const FusionWorld *world )
1633 {
1634      FusionReactor *reactor;
1635 
1636      reactor = D_CALLOC( 1, sizeof(FusionReactor) );
1637      if (!reactor)
1638           return NULL;
1639 
1640      reactor->msg_size = msg_size;
1641 
1642      direct_util_recursive_pthread_mutex_init( &reactor->reactions_lock );
1643      direct_util_recursive_pthread_mutex_init( &reactor->globals_lock );
1644 
1645      return reactor;
1646 }
1647 
1648 DirectResult
fusion_reactor_set_lock(FusionReactor * reactor,FusionSkirmish * lock)1649 fusion_reactor_set_lock( FusionReactor  *reactor,
1650                          FusionSkirmish *lock )
1651 {
1652      D_ASSERT( reactor != NULL );
1653      D_ASSERT( lock != NULL );
1654 
1655 //     D_UNIMPLEMENTED();
1656 
1657      return DR_UNIMPLEMENTED;
1658 }
1659 
1660 DirectResult
fusion_reactor_set_lock_only(FusionReactor * reactor,FusionSkirmish * lock)1661 fusion_reactor_set_lock_only( FusionReactor  *reactor,
1662                               FusionSkirmish *lock )
1663 {
1664      D_ASSERT( reactor != NULL );
1665      D_ASSERT( lock != NULL );
1666 
1667      return DR_UNIMPLEMENTED;
1668 }
1669 
1670 DirectResult
fusion_reactor_attach(FusionReactor * reactor,ReactionFunc func,void * ctx,Reaction * reaction)1671 fusion_reactor_attach (FusionReactor *reactor,
1672                        ReactionFunc   func,
1673                        void          *ctx,
1674                        Reaction      *reaction)
1675 {
1676      return fusion_reactor_attach_channel( reactor, 0, func, ctx, reaction );
1677 }
1678 
1679 DirectResult
fusion_reactor_detach(FusionReactor * reactor,Reaction * reaction)1680 fusion_reactor_detach (FusionReactor *reactor,
1681                        Reaction      *reaction)
1682 {
1683      D_ASSERT( reactor != NULL );
1684      D_ASSERT( reaction != NULL );
1685 
1686      pthread_mutex_lock( &reactor->reactions_lock );
1687 
1688      direct_list_remove( &reactor->reactions, &reaction->link );
1689 
1690      pthread_mutex_unlock( &reactor->reactions_lock );
1691 
1692      return DR_OK;
1693 }
1694 
1695 DirectResult
fusion_reactor_attach_global(FusionReactor * reactor,int index,void * ctx,GlobalReaction * reaction)1696 fusion_reactor_attach_global (FusionReactor  *reactor,
1697                               int             index,
1698                               void           *ctx,
1699                               GlobalReaction *reaction)
1700 {
1701      D_ASSERT( reactor != NULL );
1702      D_ASSERT( index >= 0 );
1703      D_ASSERT( reaction != NULL );
1704 
1705      reaction->index = index;
1706      reaction->ctx   = ctx;
1707      // Mark the reaction as attached now.
1708      reaction->attached = true;
1709 
1710      pthread_mutex_lock( &reactor->globals_lock );
1711 
1712      direct_list_prepend( &reactor->globals, &reaction->link );
1713 
1714      pthread_mutex_unlock( &reactor->globals_lock );
1715 
1716      return DR_OK;
1717 }
1718 
1719 DirectResult
fusion_reactor_detach_global(FusionReactor * reactor,GlobalReaction * reaction)1720 fusion_reactor_detach_global (FusionReactor  *reactor,
1721                               GlobalReaction *reaction)
1722 {
1723      D_ASSERT( reactor != NULL );
1724      D_ASSERT( reaction != NULL );
1725 
1726      pthread_mutex_lock( &reactor->globals_lock );
1727 
1728      // Check to prevent multiple detaches from being performed.
1729      if (reaction->attached) {
1730           // Mark as detached, since the global reaction is being removed
1731           // from the global reaction list.
1732           reaction->attached = false;
1733 
1734           // Remove the reaction from the list.
1735           direct_list_remove( &reactor->globals, &reaction->link );
1736      }
1737 
1738      pthread_mutex_unlock( &reactor->globals_lock );
1739 
1740      return DR_OK;
1741 }
1742 
1743 DirectResult
fusion_reactor_attach_channel(FusionReactor * reactor,int channel,ReactionFunc func,void * ctx,Reaction * reaction)1744 fusion_reactor_attach_channel( FusionReactor *reactor,
1745                                int            channel,
1746                                ReactionFunc   func,
1747                                void          *ctx,
1748                                Reaction      *reaction )
1749 {
1750      D_ASSERT( reactor != NULL );
1751      D_ASSERT( func != NULL );
1752      D_ASSERT( reaction != NULL );
1753 
1754      reaction->func      = func;
1755      reaction->ctx       = ctx;
1756      reaction->node_link = (void*)(long) channel;
1757 
1758      pthread_mutex_lock( &reactor->reactions_lock );
1759 
1760      direct_list_prepend( &reactor->reactions, &reaction->link );
1761 
1762      pthread_mutex_unlock( &reactor->reactions_lock );
1763 
1764      return DR_OK;
1765 }
1766 
1767 DirectResult
fusion_reactor_dispatch_channel(FusionReactor * reactor,int channel,const void * msg_data,int msg_size,bool self,const ReactionFunc * globals)1768 fusion_reactor_dispatch_channel( FusionReactor      *reactor,
1769                                  int                 channel,
1770                                  const void         *msg_data,
1771                                  int                 msg_size,
1772                                  bool                self,
1773                                  const ReactionFunc *globals )
1774 {
1775      DirectLink *l;
1776 
1777      D_ASSERT( reactor != NULL );
1778      D_ASSERT( msg_data != NULL );
1779 
1780      if (reactor->globals) {
1781           if (globals)
1782                process_globals( reactor, msg_data, globals );
1783           else
1784                D_ERROR( "Fusion/Reactor: global reactions exist but no "
1785                         "globals have been passed to dispatch()\n" );
1786      }
1787 
1788      if (!self)
1789           return DR_OK;
1790 
1791      pthread_mutex_lock( &reactor->reactions_lock );
1792 
1793      l = reactor->reactions;
1794      while (l) {
1795           DirectLink *next     = l->next;
1796           Reaction   *reaction = (Reaction*) l;
1797 
1798           if ((long) reaction->node_link == channel) {
1799                switch (reaction->func( msg_data, reaction->ctx )) {
1800                     case RS_REMOVE:
1801                          direct_list_remove( &reactor->reactions, l );
1802                          break;
1803 
1804                     case RS_DROP:
1805                          pthread_mutex_unlock( &reactor->reactions_lock );
1806                          return DR_OK;
1807 
1808                     default:
1809                          break;
1810                }
1811           }
1812 
1813           l = next;
1814      }
1815 
1816      pthread_mutex_unlock( &reactor->reactions_lock );
1817 
1818      return DR_OK;
1819 }
1820 
1821 DirectResult
fusion_reactor_set_dispatch_callback(FusionReactor * reactor,FusionCall * call,void * call_ptr)1822 fusion_reactor_set_dispatch_callback( FusionReactor  *reactor,
1823                                       FusionCall     *call,
1824                                       void           *call_ptr )
1825 {
1826      D_UNIMPLEMENTED();
1827 
1828      return DR_UNIMPLEMENTED;
1829 }
1830 
1831 DirectResult
fusion_reactor_set_name(FusionReactor * reactor,const char * name)1832 fusion_reactor_set_name( FusionReactor *reactor,
1833                          const char    *name )
1834 {
1835      D_UNIMPLEMENTED();
1836 
1837      return DR_UNIMPLEMENTED;
1838 }
1839 
1840 DirectResult
fusion_reactor_dispatch(FusionReactor * reactor,const void * msg_data,bool self,const ReactionFunc * globals)1841 fusion_reactor_dispatch (FusionReactor      *reactor,
1842                          const void         *msg_data,
1843                          bool                self,
1844                          const ReactionFunc *globals)
1845 {
1846      return fusion_reactor_dispatch_channel( reactor, 0, msg_data, reactor->msg_size, self, globals );
1847 }
1848 
1849 DirectResult
fusion_reactor_direct(FusionReactor * reactor,bool direct)1850 fusion_reactor_direct( FusionReactor *reactor, bool direct )
1851 {
1852      D_ASSERT( reactor != NULL );
1853 
1854      return DR_OK;
1855 }
1856 
1857 DirectResult
fusion_reactor_destroy(FusionReactor * reactor)1858 fusion_reactor_destroy (FusionReactor *reactor)
1859 {
1860      D_ASSERT( reactor != NULL );
1861 
1862      D_ASSUME( !reactor->destroyed );
1863 
1864      reactor->destroyed = true;
1865 
1866      return DR_OK;
1867 }
1868 
1869 DirectResult
fusion_reactor_free(FusionReactor * reactor)1870 fusion_reactor_free (FusionReactor *reactor)
1871 {
1872      D_ASSERT( reactor != NULL );
1873 
1874 //     D_ASSUME( reactor->destroyed );
1875 
1876      reactor->reactions = NULL;
1877      pthread_mutex_destroy( &reactor->reactions_lock );
1878 
1879      reactor->globals = NULL;
1880      pthread_mutex_destroy( &reactor->globals_lock );
1881 
1882      D_FREE( reactor );
1883 
1884      return DR_OK;
1885 }
1886 
1887 /******************************************************************************/
1888 
1889 static void
process_globals(FusionReactor * reactor,const void * msg_data,const ReactionFunc * globals)1890 process_globals( FusionReactor      *reactor,
1891                  const void         *msg_data,
1892                  const ReactionFunc *globals )
1893 {
1894      DirectLink     *n;
1895      GlobalReaction *global;
1896      int             max_index = -1;
1897 
1898      D_ASSERT( reactor != NULL );
1899      D_ASSERT( msg_data != NULL );
1900      D_ASSERT( globals != NULL );
1901 
1902      while (globals[max_index+1])
1903           max_index++;
1904 
1905      if (max_index < 0)
1906           return;
1907 
1908      pthread_mutex_lock( &reactor->globals_lock );
1909 
1910      direct_list_foreach_safe (global, n, reactor->globals) {
1911           if (global->index < 0 || global->index > max_index) {
1912                D_WARN( "global reaction index out of bounds (%d/%d)", global->index, max_index );
1913           }
1914           else {
1915                // Remove the reaction if requested.
1916                if (globals[ global->index ]( msg_data, global->ctx ) == RS_REMOVE) {
1917                     // Mark as detached, since the global reaction is being
1918                     // removed from the global reaction list.
1919                     global->attached = false;
1920 
1921                     direct_list_remove( &reactor->globals, &global->link );
1922                }
1923           }
1924      }
1925 
1926      pthread_mutex_unlock( &reactor->globals_lock );
1927 }
1928 
1929 DirectResult
fusion_reactor_add_permissions(FusionReactor * reactor,FusionID fusion_id,FusionReactorPermissions permissions)1930 fusion_reactor_add_permissions( FusionReactor            *reactor,
1931                                 FusionID                  fusion_id,
1932                                 FusionReactorPermissions  permissions )
1933 {
1934      return DR_OK;
1935 }
1936 
1937 #endif
1938 
1939