1 /*
2 (c) Copyright 2001-2009 The world wide DirectFB Open Source Community (directfb.org)
3 (c) Copyright 2000-2004 Convergence (integrated media) GmbH
4
5 All rights reserved.
6
7 Written by Denis Oliver Kropp <dok@directfb.org>,
8 Andreas Hundt <andi@fischlustig.de>,
9 Sven Neumann <neo@directfb.org>,
10 Ville Syrjälä <syrjala@sci.fi> and
11 Claudio Ciccani <klan@users.sf.net>.
12
13 This library is free software; you can redistribute it and/or
14 modify it under the terms of the GNU Lesser General Public
15 License as published by the Free Software Foundation; either
16 version 2 of the License, or (at your option) any later version.
17
18 This library is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 Lesser General Public License for more details.
22
23 You should have received a copy of the GNU Lesser General Public
24 License along with this library; if not, write to the
25 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
26 Boston, MA 02111-1307, USA.
27 */
28
29 #include <config.h>
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include <unistd.h>
33 #include <errno.h>
34 #include <signal.h>
35
36 #include <sys/param.h>
37
38 #include <pthread.h>
39
40 #include <fusion/build.h>
41
42 #include <direct/debug.h>
43 #include <direct/list.h>
44 #include <direct/mem.h>
45 #include <direct/messages.h>
46 #include <direct/thread.h>
47 #include <direct/trace.h>
48 #include <direct/util.h>
49
50 #include <fusion/types.h>
51 #include <fusion/lock.h>
52 #include <fusion/shmalloc.h>
53 #include <fusion/reactor.h>
54
55 #include "fusion_internal.h"
56
57
58 #if FUSION_BUILD_MULTI
59
60 D_DEBUG_DOMAIN( Fusion_Reactor, "Fusion/Reactor", "Fusion's Reactor" );
61
62 struct __Fusion_FusionReactor {
63 int magic;
64
65 int id; /* reactor id */
66 int msg_size; /* size of each message */
67 bool direct;
68 bool destroyed;
69
70 DirectLink *globals;
71 FusionSkirmish *globals_lock;
72
73 FusionWorldShared *shared;
74
75 #if !FUSION_BUILD_KERNEL
76 DirectLink *listeners; /* list of attached listeners */
77 FusionSkirmish listeners_lock;
78
79 FusionCall *call;
80 #endif
81 };
82
83 typedef struct {
84 DirectLink link;
85
86 int magic;
87
88 pthread_rwlock_t lock;
89
90 int reactor_id;
91 FusionReactor *reactor;
92
93 DirectLink *links; /* reactor listeners attached to node */
94 } ReactorNode;
95
96 typedef struct {
97 DirectLink link;
98
99 int magic;
100
101 Reaction *reaction;
102 int channel;
103 } NodeLink;
104
105 /**************************************************************************************************/
106
107 static ReactorNode *lock_node ( int reactor_id,
108 bool add_it,
109 bool wlock,
110 FusionReactor *reactor, /* one of reactor and world must not be NULL */
111 FusionWorld *world );
112
113 static void unlock_node ( ReactorNode *node );
114
115 static void process_globals( FusionReactor *reactor,
116 const void *msg_data,
117 const ReactionFunc *globals );
118
119 /**************************************************************************************************/
120
121 #if FUSION_BUILD_KERNEL
122
123 FusionReactor *
fusion_reactor_new(int msg_size,const char * name,const FusionWorld * world)124 fusion_reactor_new( int msg_size,
125 const char *name,
126 const FusionWorld *world )
127 {
128 FusionEntryInfo info;
129 FusionReactor *reactor;
130 FusionWorldShared *shared;
131
132 D_ASSERT( name != NULL );
133 D_MAGIC_ASSERT( world, FusionWorld );
134
135 shared = world->shared;
136
137 D_MAGIC_ASSERT( shared, FusionWorldShared );
138
139 D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_new( '%s', size %d )\n", name ? : "", msg_size );
140
141 /* allocate shared reactor data */
142 reactor = SHCALLOC( shared->main_pool, 1, sizeof (FusionReactor) );
143 if (!reactor) {
144 D_OOSHM();
145 return NULL;
146 }
147
148 /* create a new reactor */
149 while (ioctl( world->fusion_fd, FUSION_REACTOR_NEW, &reactor->id )) {
150 if (errno == EINTR)
151 continue;
152
153 D_PERROR( "FUSION_REACTOR_NEW" );
154 SHFREE( shared->main_pool, reactor );
155 return NULL;
156 }
157
158 /* set the static message size, should we make dynamic? (TODO?) */
159 reactor->msg_size = msg_size;
160
161 /* Set default lock for global reactions. */
162 reactor->globals_lock = &shared->reactor_globals;
163
164 D_DEBUG_AT( Fusion_Reactor, " -> new reactor %p [%d] with lock %p [%d]\n",
165 reactor, reactor->id, reactor->globals_lock, reactor->globals_lock->multi.id );
166
167 reactor->shared = shared;
168 reactor->direct = true;
169
170 D_MAGIC_SET( reactor, FusionReactor );
171
172
173 info.type = FT_REACTOR;
174 info.id = reactor->id;
175
176 direct_snputs( info.name, name, sizeof(info.name) );
177
178 ioctl( world->fusion_fd, FUSION_ENTRY_SET_INFO, &info );
179
180 return reactor;
181 }
182
183 DirectResult
fusion_reactor_destroy(FusionReactor * reactor)184 fusion_reactor_destroy( FusionReactor *reactor )
185 {
186 FusionWorldShared *shared;
187
188 D_MAGIC_ASSERT( reactor, FusionReactor );
189
190 shared = reactor->shared;
191
192 D_MAGIC_ASSERT( shared, FusionWorldShared );
193
194 D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_destroy( %p [%d] )\n", reactor, reactor->id );
195
196 D_ASSUME( !reactor->destroyed );
197
198 if (reactor->destroyed)
199 return DR_DESTROYED;
200
201 while (ioctl( _fusion_fd( shared ), FUSION_REACTOR_DESTROY, &reactor->id )) {
202 switch (errno) {
203 case EINTR:
204 continue;
205
206 case EINVAL:
207 D_ERROR( "Fusion/Reactor: invalid reactor\n" );
208 return DR_DESTROYED;
209 }
210
211 D_PERROR( "FUSION_REACTOR_DESTROY" );
212 return DR_FUSION;
213 }
214
215 reactor->destroyed = true;
216
217 return DR_OK;
218 }
219
220 DirectResult
fusion_reactor_free(FusionReactor * reactor)221 fusion_reactor_free( FusionReactor *reactor )
222 {
223 FusionWorldShared *shared;
224
225 D_MAGIC_ASSERT( reactor, FusionReactor );
226
227 shared = reactor->shared;
228
229 D_MAGIC_ASSERT( shared, FusionWorldShared );
230
231 D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_free( %p [%d] )\n", reactor, reactor->id );
232
233 D_MAGIC_CLEAR( reactor );
234
235 // D_ASSUME( reactor->destroyed );
236
237 if (!reactor->destroyed)
238 while (ioctl( _fusion_fd( shared ), FUSION_REACTOR_DESTROY, &reactor->id ) && errno == EINTR);
239
240 /* free shared reactor data */
241 SHFREE( shared->main_pool, reactor );
242
243 return DR_OK;
244 }
245
246 DirectResult
fusion_reactor_attach_channel(FusionReactor * reactor,int channel,ReactionFunc func,void * ctx,Reaction * reaction)247 fusion_reactor_attach_channel( FusionReactor *reactor,
248 int channel,
249 ReactionFunc func,
250 void *ctx,
251 Reaction *reaction )
252 {
253 ReactorNode *node;
254 NodeLink *link;
255 FusionReactorAttach attach;
256
257 D_MAGIC_ASSERT( reactor, FusionReactor );
258 D_ASSERT( func != NULL );
259 D_ASSERT( reaction != NULL );
260
261 D_DEBUG_AT( Fusion_Reactor,
262 "fusion_reactor_attach( %p [%d], func %p, ctx %p, reaction %p )\n",
263 reactor, reactor->id, func, ctx, reaction );
264
265 link = D_CALLOC( 1, sizeof(NodeLink) );
266 if (!link)
267 return D_OOM();
268
269 node = lock_node( reactor->id, true, true, reactor, NULL );
270 if (!node) {
271 D_FREE( link );
272 return DR_FUSION;
273 }
274
275 attach.reactor_id = reactor->id;
276 attach.channel = channel;
277
278 while (ioctl( _fusion_fd( reactor->shared ), FUSION_REACTOR_ATTACH, &attach )) {
279 switch (errno) {
280 case EINTR:
281 continue;
282
283 case EINVAL:
284 D_ERROR( "Fusion/Reactor: invalid reactor\n" );
285 unlock_node( node );
286 D_FREE( link );
287 return DR_DESTROYED;
288 }
289
290 D_PERROR( "FUSION_REACTOR_ATTACH" );
291 unlock_node( node );
292 D_FREE( link );
293 return DR_FUSION;
294 }
295
296 /* fill out callback information */
297 reaction->func = func;
298 reaction->ctx = ctx;
299 reaction->node_link = link;
300
301 link->reaction = reaction;
302 link->channel = channel;
303
304 D_MAGIC_SET( link, NodeLink );
305
306 /* prepend the reaction to the local reaction list */
307 direct_list_prepend( &node->links, &link->link );
308
309 unlock_node( node );
310
311 return DR_OK;
312 }
313
314 static void
remove_node_link(ReactorNode * node,NodeLink * link)315 remove_node_link( ReactorNode *node,
316 NodeLink *link )
317 {
318 D_MAGIC_ASSERT( node, ReactorNode );
319 D_MAGIC_ASSERT( link, NodeLink );
320
321 D_ASSUME( link->reaction == NULL );
322
323 direct_list_remove( &node->links, &link->link );
324
325 D_MAGIC_CLEAR( link );
326
327 D_FREE( link );
328 }
329
330 DirectResult
fusion_reactor_detach(FusionReactor * reactor,Reaction * reaction)331 fusion_reactor_detach( FusionReactor *reactor,
332 Reaction *reaction )
333 {
334 ReactorNode *node;
335 NodeLink *link;
336
337 D_MAGIC_ASSERT( reactor, FusionReactor );
338 D_ASSERT( reaction != NULL );
339
340 D_DEBUG_AT( Fusion_Reactor,
341 "fusion_reactor_detach( %p [%d], reaction %p ) <- func %p, ctx %p\n",
342 reactor, reactor->id, reaction, reaction->func, reaction->ctx );
343
344 node = lock_node( reactor->id, false, true, reactor, NULL );
345 if (!node) {
346 D_BUG( "node not found" );
347 return DR_BUG;
348 }
349
350 link = reaction->node_link;
351 D_ASSUME( link != NULL );
352
353 if (link) {
354 FusionReactorDetach detach;
355
356 D_ASSERT( link->reaction == reaction );
357
358 detach.reactor_id = reactor->id;
359 detach.channel = link->channel;
360
361 reaction->node_link = NULL;
362
363 link->reaction = NULL;
364
365 remove_node_link( node, link );
366
367 while (ioctl( _fusion_fd( reactor->shared ), FUSION_REACTOR_DETACH, &detach )) {
368 switch (errno) {
369 case EINTR:
370 continue;
371
372 case EINVAL:
373 D_ERROR( "Fusion/Reactor: invalid reactor\n" );
374 unlock_node( node );
375 return DR_DESTROYED;
376 }
377
378 D_PERROR( "FUSION_REACTOR_DETACH" );
379 unlock_node( node );
380 return DR_FUSION;
381 }
382 }
383
384 unlock_node( node );
385
386 return DR_OK;
387 }
388
389 DirectResult
fusion_reactor_dispatch_channel(FusionReactor * reactor,int channel,const void * msg_data,int msg_size,bool self,const ReactionFunc * globals)390 fusion_reactor_dispatch_channel( FusionReactor *reactor,
391 int channel,
392 const void *msg_data,
393 int msg_size,
394 bool self,
395 const ReactionFunc *globals )
396 {
397 FusionReactorDispatch dispatch;
398
399 D_MAGIC_ASSERT( reactor, FusionReactor );
400
401 D_ASSERT( msg_data != NULL );
402
403 D_DEBUG_AT( Fusion_Reactor,
404 "fusion_reactor_dispatch( %p [%d], msg_data %p, self %s, globals %p)\n",
405 reactor, reactor->id, msg_data, self ? "true" : "false", globals );
406
407 /* Handle global reactions first. */
408 if (reactor->globals) {
409 if (globals)
410 process_globals( reactor, msg_data, globals );
411 else
412 D_ERROR( "Fusion/Reactor: global reactions exist but no "
413 "globals have been passed to dispatch()\n" );
414 }
415
416 /* Handle local reactions. */
417 if (self && reactor->direct) {
418 _fusion_reactor_process_message( _fusion_world(reactor->shared), reactor->id, channel, msg_data );
419 self = false;
420 }
421
422 /* Initialize dispatch data. */
423 dispatch.reactor_id = reactor->id;
424 dispatch.channel = channel;
425 dispatch.self = self;
426 dispatch.msg_size = msg_size;
427 dispatch.msg_data = msg_data;
428
429 /* Dispatch the message to handle foreign reactions. */
430 while (ioctl( _fusion_fd( reactor->shared ), FUSION_REACTOR_DISPATCH, &dispatch )) {
431 switch (errno) {
432 case EINTR:
433 continue;
434
435 case EINVAL:
436 D_ERROR( "Fusion/Reactor: invalid reactor\n" );
437 return DR_DESTROYED;
438 }
439
440 D_PERROR( "FUSION_REACTOR_DISPATCH" );
441 return DR_FUSION;
442 }
443
444 return DR_OK;
445 }
446
447 DirectResult
fusion_reactor_set_dispatch_callback(FusionReactor * reactor,FusionCall * call,void * call_ptr)448 fusion_reactor_set_dispatch_callback( FusionReactor *reactor,
449 FusionCall *call,
450 void *call_ptr )
451 {
452 FusionReactorSetCallback callback;
453
454 D_MAGIC_ASSERT( reactor, FusionReactor );
455 D_ASSERT( call != NULL );
456
457 D_DEBUG_AT( Fusion_Reactor,
458 "fusion_reactor_set_dispatch_callback( %p [%d], call %p [%d], ptr %p)\n",
459 reactor, reactor->id, call, call->call_id, call_ptr );
460
461 /* Fill callback info. */
462 callback.reactor_id = reactor->id;
463 callback.call_id = call->call_id;
464 callback.call_ptr = call_ptr;
465
466 /* Set the dispatch callback. */
467 while (ioctl( _fusion_fd( reactor->shared ), FUSION_REACTOR_SET_DISPATCH_CALLBACK, &callback )) {
468 switch (errno) {
469 case EINTR:
470 continue;
471
472 case EINVAL:
473 D_ERROR( "Fusion/Reactor: invalid reactor\n" );
474 return DR_DESTROYED;
475 }
476
477 D_PERROR( "FUSION_REACTOR_SET_DISPATCH_CALLBACK" );
478 return DR_FUSION;
479 }
480
481 return DR_OK;
482 }
483
484 DirectResult
fusion_reactor_set_name(FusionReactor * reactor,const char * name)485 fusion_reactor_set_name( FusionReactor *reactor,
486 const char *name )
487 {
488 FusionEntryInfo info;
489
490 D_MAGIC_ASSERT( reactor, FusionReactor );
491 D_ASSERT( name != NULL );
492
493 D_DEBUG_AT( Fusion_Reactor, "%s( %p, '%s' )\n", __FUNCTION__, reactor, name );
494
495 /* Initialize reactor info. */
496 info.type = FT_REACTOR;
497 info.id = reactor->id;
498
499 /* Put reactor name into info. */
500 direct_snputs( info.name, name, sizeof(info.name) );
501
502 /* Set the reactor info. */
503 while (ioctl( _fusion_fd( reactor->shared ), FUSION_ENTRY_SET_INFO, &info )) {
504 switch (errno) {
505 case EINTR:
506 continue;
507
508 case EINVAL:
509 D_ERROR( "Fusion/Reactor: invalid reactor\n" );
510 return DR_IDNOTFOUND;
511 }
512
513 D_PERROR( "FUSION_ENTRY_SET_INFO( reactor 0x%08x, '%s' )\n", reactor->id, name );
514 return DR_FUSION;
515 }
516
517 return DR_OK;
518 }
519
520 DirectResult
fusion_reactor_add_permissions(FusionReactor * reactor,FusionID fusion_id,FusionReactorPermissions reactor_permissions)521 fusion_reactor_add_permissions( FusionReactor *reactor,
522 FusionID fusion_id,
523 FusionReactorPermissions reactor_permissions )
524 {
525 FusionEntryPermissions permissions;
526
527 permissions.type = FT_REACTOR;
528 permissions.id = reactor->id;
529 permissions.fusion_id = fusion_id;
530 permissions.permissions = 0;
531
532 if (reactor_permissions & FUSION_REACTOR_PERMIT_ATTACH_DETACH) {
533 FUSION_ENTRY_PERMISSIONS_ADD( permissions.permissions, FUSION_REACTOR_ATTACH );
534 FUSION_ENTRY_PERMISSIONS_ADD( permissions.permissions, FUSION_REACTOR_DETACH );
535 }
536
537 if (reactor_permissions & FUSION_REACTOR_PERMIT_DISPATCH)
538 FUSION_ENTRY_PERMISSIONS_ADD( permissions.permissions, FUSION_REACTOR_DISPATCH );
539
540 while (ioctl( _fusion_fd( reactor->shared ), FUSION_ENTRY_ADD_PERMISSIONS, &permissions ) < 0) {
541 if (errno != EINTR) {
542 D_PERROR( "Fusion/Reactor: FUSION_ENTRY_ADD_PERMISSIONS( id %d ) failed!\n", reactor->id );
543 return DR_FAILURE;
544 }
545 }
546
547 return DR_OK;
548 }
549
550 void
_fusion_reactor_process_message(FusionWorld * world,int reactor_id,int channel,const void * msg_data)551 _fusion_reactor_process_message( FusionWorld *world,
552 int reactor_id,
553 int channel,
554 const void *msg_data )
555 {
556 ReactorNode *node;
557 NodeLink *link;
558
559 D_MAGIC_ASSERT( world, FusionWorld );
560 D_ASSERT( msg_data != NULL );
561
562 D_DEBUG_AT( Fusion_Reactor,
563 " _fusion_reactor_process_message( [%d], msg_data %p )\n", reactor_id, msg_data );
564
565 /* Find the local counter part of the reactor. */
566 node = lock_node( reactor_id, false, false, NULL, world );
567 if (!node)
568 return;
569
570 D_DEBUG_AT( Fusion_Reactor, " -> node %p, reactor %p\n", node, node->reactor );
571
572 D_ASSUME( node->links != NULL );
573
574 if (!node->links) {
575 D_DEBUG_AT( Fusion_Reactor, " -> no local reactions!?!\n" );
576 unlock_node( node );
577 return;
578 }
579
580 direct_list_foreach (link, node->links) {
581 Reaction *reaction;
582
583 D_MAGIC_ASSERT( link, NodeLink );
584
585 if (link->channel != channel)
586 continue;
587
588 reaction = link->reaction;
589 if (!reaction)
590 continue;
591
592 if (reaction->func( msg_data, reaction->ctx ) == RS_REMOVE) {
593 FusionReactorDetach detach;
594
595 detach.reactor_id = reactor_id;
596 detach.channel = channel;
597
598 D_DEBUG_AT( Fusion_Reactor, " -> removing %p, func %p, ctx %p\n",
599 reaction, reaction->func, reaction->ctx );
600
601 link->reaction = NULL;
602
603 /* We can't remove the link as we only have read lock, to avoid dead locks. */
604
605 while (ioctl( world->fusion_fd, FUSION_REACTOR_DETACH, &detach )) {
606 switch (errno) {
607 case EINTR:
608 continue;
609
610 case EINVAL:
611 D_ERROR( "Fusion/Reactor: invalid reactor (DETACH)\n" );
612 break;
613
614 default:
615 D_PERROR( "FUSION_REACTOR_DETACH" );
616 break;
617 }
618
619 break;
620 }
621 }
622 }
623
624 unlock_node( node );
625 }
626
627 #else /* FUSION_BUILD_KERNEL */
628
629 typedef struct {
630 DirectLink link;
631
632 unsigned int refs;
633
634 FusionID fusion_id;
635 int channel;
636 } __Listener;
637
638
639 FusionReactor *
fusion_reactor_new(int msg_size,const char * name,const FusionWorld * world)640 fusion_reactor_new( int msg_size,
641 const char *name,
642 const FusionWorld *world )
643 {
644 FusionReactor *reactor;
645 FusionWorldShared *shared;
646
647 D_ASSERT( name != NULL );
648 D_MAGIC_ASSERT( world, FusionWorld );
649
650 shared = world->shared;
651
652 D_MAGIC_ASSERT( shared, FusionWorldShared );
653
654 D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_new( '%s', size %d )\n", name ? : "", msg_size );
655
656 /* allocate shared reactor data */
657 reactor = SHCALLOC( shared->main_pool, 1, sizeof (FusionReactor) );
658 if (!reactor) {
659 D_OOSHM();
660 return NULL;
661 }
662
663 /* Generate the reactor id */
664 reactor->id = ++shared->reactor_ids;
665
666 /* Set the static message size, should we make dynamic? (TODO?) */
667 reactor->msg_size = msg_size;
668
669 /* Set default lock for global reactions. */
670 reactor->globals_lock = &shared->reactor_globals;
671
672 fusion_skirmish_init( &reactor->listeners_lock, "Reactor Listeners", world );
673
674 D_DEBUG_AT( Fusion_Reactor, " -> new reactor %p [%d] with lock %p [%d]\n",
675 reactor, reactor->id, reactor->globals_lock, reactor->globals_lock->multi.id );
676
677 reactor->shared = shared;
678 reactor->direct = true;
679
680 D_MAGIC_SET( reactor, FusionReactor );
681
682 return reactor;
683 }
684
685 DirectResult
fusion_reactor_destroy(FusionReactor * reactor)686 fusion_reactor_destroy( FusionReactor *reactor )
687 {
688 FusionWorldShared *shared;
689
690 D_MAGIC_ASSERT( reactor, FusionReactor );
691
692 shared = reactor->shared;
693
694 D_MAGIC_ASSERT( shared, FusionWorldShared );
695
696 D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_destroy( %p [%d] )\n", reactor, reactor->id );
697
698 D_ASSUME( !reactor->destroyed );
699
700 if (reactor->destroyed)
701 return DR_DESTROYED;
702
703 fusion_skirmish_destroy( &reactor->listeners_lock );
704
705 reactor->destroyed = true;
706
707 return DR_OK;
708 }
709
710 DirectResult
fusion_reactor_free(FusionReactor * reactor)711 fusion_reactor_free( FusionReactor *reactor )
712 {
713 FusionWorldShared *shared;
714 __Listener *listener, *temp;
715
716 D_MAGIC_ASSERT( reactor, FusionReactor );
717
718 shared = reactor->shared;
719
720 D_MAGIC_ASSERT( shared, FusionWorldShared );
721
722 D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_free( %p [%d] )\n", reactor, reactor->id );
723
724 D_MAGIC_CLEAR( reactor );
725
726 // D_ASSUME( reactor->destroyed );
727
728 direct_list_foreach_safe (listener, temp, reactor->listeners) {
729 direct_list_remove( &reactor->listeners, &listener->link );
730 SHFREE( shared->main_pool, listener );
731 }
732
733 /* free shared reactor data */
734 SHFREE( shared->main_pool, reactor );
735
736 return DR_OK;
737 }
738
739 DirectResult
fusion_reactor_attach_channel(FusionReactor * reactor,int channel,ReactionFunc func,void * ctx,Reaction * reaction)740 fusion_reactor_attach_channel( FusionReactor *reactor,
741 int channel,
742 ReactionFunc func,
743 void *ctx,
744 Reaction *reaction )
745 {
746 FusionWorldShared *shared;
747 ReactorNode *node;
748 NodeLink *link;
749 FusionID fusion_id;
750 __Listener *listener;
751
752 D_MAGIC_ASSERT( reactor, FusionReactor );
753 D_ASSERT( func != NULL );
754 D_ASSERT( reaction != NULL );
755
756 D_DEBUG_AT( Fusion_Reactor,
757 "fusion_reactor_attach( %p [%d], func %p, ctx %p, reaction %p )\n",
758 reactor, reactor->id, func, ctx, reaction );
759
760 if (reactor->destroyed)
761 return DR_DESTROYED;
762
763 shared = reactor->shared;
764
765 link = D_CALLOC( 1, sizeof(NodeLink) );
766 if (!link)
767 return D_OOM();
768
769 node = lock_node( reactor->id, true, true, reactor, NULL );
770 if (!node) {
771 D_FREE( link );
772 return DR_FUSION;
773 }
774
775 fusion_id = _fusion_id( shared );
776
777 fusion_skirmish_prevail( &reactor->listeners_lock );
778
779 direct_list_foreach (listener, reactor->listeners) {
780 if (listener->fusion_id == fusion_id && listener->channel == channel) {
781 listener->refs++;
782 break;
783 }
784 }
785
786 if (!listener) {
787 listener = SHCALLOC( shared->main_pool, 1, sizeof(__Listener) );
788 if (!listener) {
789 D_OOSHM();
790 fusion_skirmish_dismiss( &reactor->listeners_lock );
791 unlock_node( node );
792 D_FREE( link );
793 return DR_NOSHAREDMEMORY;
794 }
795
796 listener->refs = 1;
797 listener->fusion_id = fusion_id;
798 listener->channel = channel;
799
800 direct_list_append( &reactor->listeners, &listener->link );
801 }
802
803 fusion_skirmish_dismiss( &reactor->listeners_lock );
804
805 /* fill out callback information */
806 reaction->func = func;
807 reaction->ctx = ctx;
808 reaction->node_link = link;
809
810 link->reaction = reaction;
811 link->channel = channel;
812
813 D_MAGIC_SET( link, NodeLink );
814
815 /* prepend the reaction to the local reaction list */
816 direct_list_prepend( &node->links, &link->link );
817
818 unlock_node( node );
819
820 return DR_OK;
821 }
822
823 static void
remove_node_link(ReactorNode * node,NodeLink * link)824 remove_node_link( ReactorNode *node,
825 NodeLink *link )
826 {
827 D_MAGIC_ASSERT( node, ReactorNode );
828 D_MAGIC_ASSERT( link, NodeLink );
829
830 D_ASSUME( link->reaction == NULL );
831
832 direct_list_remove( &node->links, &link->link );
833
834 D_MAGIC_CLEAR( link );
835
836 D_FREE( link );
837 }
838
839 DirectResult
fusion_reactor_detach(FusionReactor * reactor,Reaction * reaction)840 fusion_reactor_detach( FusionReactor *reactor,
841 Reaction *reaction )
842 {
843 FusionWorldShared *shared;
844 ReactorNode *node;
845 NodeLink *link;
846
847 D_MAGIC_ASSERT( reactor, FusionReactor );
848 D_ASSERT( reaction != NULL );
849
850 D_DEBUG_AT( Fusion_Reactor,
851 "fusion_reactor_detach( %p [%d], reaction %p ) <- func %p, ctx %p\n",
852 reactor, reactor->id, reaction, reaction->func, reaction->ctx );
853
854 if (reactor->destroyed)
855 return DR_DESTROYED;
856
857 shared = reactor->shared;
858
859 node = lock_node( reactor->id, false, true, reactor, NULL );
860 if (!node) {
861 D_BUG( "node not found" );
862 return DR_BUG;
863 }
864
865 link = reaction->node_link;
866 D_ASSUME( link != NULL );
867
868 if (link) {
869 __Listener *listener;
870 FusionID fusion_id = _fusion_id( shared );
871
872 D_ASSERT( link->reaction == reaction );
873
874 reaction->node_link = NULL;
875
876 link->reaction = NULL;
877
878 remove_node_link( node, link );
879
880 fusion_skirmish_prevail( &reactor->listeners_lock );
881
882 direct_list_foreach (listener, reactor->listeners) {
883 if (listener->fusion_id == fusion_id && listener->channel == link->channel) {
884 if (--listener->refs == 0) {
885 direct_list_remove( &reactor->listeners, &listener->link );
886 SHFREE( shared->main_pool, listener );
887 }
888 break;
889 }
890 }
891
892 fusion_skirmish_dismiss( &reactor->listeners_lock );
893
894 if (!listener)
895 D_ERROR( "Fusion/Reactor: Couldn't detach listener!\n" );
896 }
897
898 unlock_node( node );
899
900 return DR_OK;
901 }
902
903 DirectResult
fusion_reactor_dispatch_channel(FusionReactor * reactor,int channel,const void * msg_data,int msg_size,bool self,const ReactionFunc * globals)904 fusion_reactor_dispatch_channel( FusionReactor *reactor,
905 int channel,
906 const void *msg_data,
907 int msg_size,
908 bool self,
909 const ReactionFunc *globals )
910 {
911 FusionWorld *world;
912 __Listener *listener, *temp;
913 FusionRef *ref = NULL;
914 FusionReactorMessage *msg;
915 struct sockaddr_un addr;
916 int len;
917
918 D_MAGIC_ASSERT( reactor, FusionReactor );
919
920 D_ASSERT( msg_data != NULL );
921
922 D_DEBUG_AT( Fusion_Reactor,
923 "fusion_reactor_dispatch( %p [%d], msg_data %p, self %s, globals %p)\n",
924 reactor, reactor->id, msg_data, self ? "true" : "false", globals );
925
926 if (reactor->destroyed)
927 return DR_DESTROYED;
928
929 if (msg_size > FUSION_MESSAGE_SIZE-sizeof(FusionReactorMessage)) {
930 D_ERROR( "Fusion/Reactor: Message too large (%d)!\n", msg_size );
931 return DR_UNSUPPORTED;
932 }
933
934 world = _fusion_world( reactor->shared );
935
936 if (reactor->call) {
937 ref = SHMALLOC( world->shared->main_pool, sizeof(FusionRef) );
938 if (!ref)
939 return D_OOSHM();
940
941 fusion_ref_init( ref, "Dispatch Ref", world );
942 fusion_ref_up( ref, true );
943 fusion_ref_watch( ref, reactor->call, 0 );
944 }
945
946 /* Handle global reactions first. */
947 if (reactor->globals) {
948 if (globals)
949 process_globals( reactor, msg_data, globals );
950 else
951 D_ERROR( "Fusion/Reactor: global reactions exist but no "
952 "globals have been passed to dispatch()\n" );
953 }
954
955 /* Handle local reactions. */
956 if (self && reactor->direct) {
957 _fusion_reactor_process_message( _fusion_world(reactor->shared), reactor->id, channel, msg_data );
958 self = false;
959 }
960
961 msg = alloca( sizeof(FusionReactorMessage) + msg_size );
962
963 msg->type = FMT_REACTOR;
964 msg->id = reactor->id;
965 msg->channel = channel;
966 msg->ref = ref;
967
968 memcpy( (void*)msg + sizeof(FusionReactorMessage), msg_data, msg_size );
969
970 addr.sun_family = AF_UNIX;
971 len = snprintf( addr.sun_path, sizeof(addr.sun_path),
972 "/tmp/.fusion-%d/", fusion_world_index( world ) );
973
974 fusion_skirmish_prevail( &reactor->listeners_lock );
975
976 direct_list_foreach_safe (listener, temp, reactor->listeners) {
977 if (listener->channel == channel) {
978 DirectResult ret;
979
980 if (!self && listener->fusion_id == world->fusion_id)
981 continue;
982
983 if (ref)
984 fusion_ref_up( ref, true );
985
986 snprintf( addr.sun_path+len, sizeof(addr.sun_path)-len, "%lx", listener->fusion_id );
987
988 D_DEBUG_AT( Fusion_Reactor, " -> sending to '%s'\n", addr.sun_path );
989
990 ret = _fusion_send_message( world->fusion_fd, msg, sizeof(FusionReactorMessage)+msg_size, &addr );
991 if (ret == DR_FUSION) {
992 D_DEBUG_AT( Fusion_Reactor, " -> removing dead listener %lu\n", listener->fusion_id );
993
994 if (ref)
995 fusion_ref_down( ref, true );
996
997 direct_list_remove( &reactor->listeners, &listener->link );
998
999 SHFREE( reactor->shared->main_pool, listener );
1000 }
1001 }
1002 }
1003
1004 fusion_skirmish_dismiss( &reactor->listeners_lock );
1005
1006 if (ref) {
1007 fusion_ref_down( ref, true );
1008 if (fusion_ref_zero_trylock( ref ) == DR_OK) {
1009 fusion_ref_destroy( ref );
1010 SHFREE( world->shared->main_pool, ref );
1011 }
1012 }
1013
1014 D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_dispatch( %p ) done.\n", reactor );
1015
1016 return DR_OK;
1017 }
1018
1019 DirectResult
fusion_reactor_set_dispatch_callback(FusionReactor * reactor,FusionCall * call,void * call_ptr)1020 fusion_reactor_set_dispatch_callback( FusionReactor *reactor,
1021 FusionCall *call,
1022 void *call_ptr )
1023 {
1024 D_MAGIC_ASSERT( reactor, FusionReactor );
1025 D_ASSERT( call != NULL );
1026
1027 D_DEBUG_AT( Fusion_Reactor,
1028 "fusion_reactor_set_dispatch_callback( %p [%d], call %p [%d], ptr %p)\n",
1029 reactor, reactor->id, call, call->call_id, call_ptr );
1030
1031 if (reactor->destroyed)
1032 return DR_DESTROYED;
1033
1034 if (call_ptr)
1035 return DR_UNIMPLEMENTED;
1036
1037 reactor->call = call;
1038
1039 return DR_OK;
1040 }
1041
1042 DirectResult
fusion_reactor_set_name(FusionReactor * reactor,const char * name)1043 fusion_reactor_set_name( FusionReactor *reactor,
1044 const char *name )
1045 {
1046 D_UNIMPLEMENTED();
1047
1048 return DR_UNIMPLEMENTED;
1049 }
1050
1051 DirectResult
fusion_reactor_add_permissions(FusionReactor * reactor,FusionID fusion_id,FusionReactorPermissions permissions)1052 fusion_reactor_add_permissions( FusionReactor *reactor,
1053 FusionID fusion_id,
1054 FusionReactorPermissions permissions )
1055 {
1056 D_UNIMPLEMENTED();
1057
1058 return DR_UNIMPLEMENTED;
1059 }
1060
1061 void
_fusion_reactor_process_message(FusionWorld * world,int reactor_id,int channel,const void * msg_data)1062 _fusion_reactor_process_message( FusionWorld *world,
1063 int reactor_id,
1064 int channel,
1065 const void *msg_data )
1066 {
1067 ReactorNode *node;
1068 NodeLink *link;
1069
1070 D_MAGIC_ASSERT( world, FusionWorld );
1071 D_ASSERT( msg_data != NULL );
1072
1073 D_DEBUG_AT( Fusion_Reactor,
1074 " _fusion_reactor_process_message( [%d], msg_data %p )\n", reactor_id, msg_data );
1075
1076 /* Find the local counter part of the reactor. */
1077 node = lock_node( reactor_id, false, false, NULL, world );
1078 if (!node)
1079 return;
1080
1081 D_DEBUG_AT( Fusion_Reactor, " -> node %p, reactor %p\n", node, node->reactor );
1082
1083 D_ASSUME( node->links != NULL );
1084
1085 if (!node->links) {
1086 D_DEBUG_AT( Fusion_Reactor, " -> no local reactions!?!\n" );
1087 unlock_node( node );
1088 return;
1089 }
1090
1091 direct_list_foreach (link, node->links) {
1092 Reaction *reaction;
1093
1094 D_MAGIC_ASSERT( link, NodeLink );
1095
1096 if (link->channel != channel)
1097 continue;
1098
1099 reaction = link->reaction;
1100 if (!reaction)
1101 continue;
1102
1103 if (reaction->func( msg_data, reaction->ctx ) == RS_REMOVE) {
1104 FusionReactor *reactor = node->reactor;
1105 __Listener *listener;
1106
1107 D_DEBUG_AT( Fusion_Reactor, " -> removing %p, func %p, ctx %p\n",
1108 reaction, reaction->func, reaction->ctx );
1109
1110 fusion_skirmish_prevail( &reactor->listeners_lock );
1111
1112 direct_list_foreach (listener, reactor->listeners) {
1113 if (listener->fusion_id == world->fusion_id && listener->channel == channel) {
1114 if (--listener->refs == 0) {
1115 direct_list_remove( &reactor->listeners, &listener->link );
1116 SHFREE( world->shared->main_pool, listener );
1117 }
1118 break;
1119 }
1120 }
1121
1122 fusion_skirmish_dismiss( &reactor->listeners_lock );
1123 }
1124 }
1125
1126 unlock_node( node );
1127 }
1128
1129 #endif /* FUSION_BUILD_KERNEL */
1130
1131
1132 DirectResult
fusion_reactor_set_lock(FusionReactor * reactor,FusionSkirmish * lock)1133 fusion_reactor_set_lock( FusionReactor *reactor,
1134 FusionSkirmish *lock )
1135 {
1136 DirectResult ret;
1137 FusionSkirmish *old;
1138
1139 D_MAGIC_ASSERT( reactor, FusionReactor );
1140
1141 old = reactor->globals_lock;
1142
1143 D_ASSERT( lock != NULL );
1144 D_ASSERT( old != NULL );
1145
1146 D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_set_lock( %p [%d], lock %p [%d] ) <- old %p [%d]\n",
1147 reactor, reactor->id, lock, lock->multi.id, old, old->multi.id );
1148
1149 /*
1150 * Acquire the old lock to make sure that changing the lock doesn't
1151 * result in mismatching lock/unlock pairs in other functions.
1152 */
1153 ret = fusion_skirmish_prevail( old );
1154 if (ret)
1155 return ret;
1156
1157 D_ASSUME( reactor->globals_lock != lock );
1158
1159 /* Set the lock replacement. */
1160 reactor->globals_lock = lock;
1161
1162 /* Release the old lock which is obsolete now. */
1163 fusion_skirmish_dismiss( old );
1164
1165 return DR_OK;
1166 }
1167
1168 DirectResult
fusion_reactor_set_lock_only(FusionReactor * reactor,FusionSkirmish * lock)1169 fusion_reactor_set_lock_only( FusionReactor *reactor,
1170 FusionSkirmish *lock )
1171 {
1172 D_MAGIC_ASSERT( reactor, FusionReactor );
1173 D_ASSERT( lock != NULL );
1174
1175 D_DEBUG_AT( Fusion_Reactor, "fusion_reactor_set_lock_only( %p [%d], lock %p [%d] ) <- old %p [%d]\n",
1176 reactor, reactor->id, lock, lock->multi.id, reactor->globals_lock, reactor->globals_lock->multi.id );
1177
1178 D_ASSUME( reactor->globals_lock != lock );
1179
1180 /* Set the lock replacement. */
1181 reactor->globals_lock = lock;
1182
1183 return DR_OK;
1184 }
1185
1186 DirectResult
fusion_reactor_attach(FusionReactor * reactor,ReactionFunc func,void * ctx,Reaction * reaction)1187 fusion_reactor_attach (FusionReactor *reactor,
1188 ReactionFunc func,
1189 void *ctx,
1190 Reaction *reaction)
1191 {
1192 D_MAGIC_ASSERT( reactor, FusionReactor );
1193 D_ASSERT( func != NULL );
1194 D_ASSERT( reaction != NULL );
1195
1196 return fusion_reactor_attach_channel( reactor, 0, func, ctx, reaction );
1197 }
1198
1199 DirectResult
fusion_reactor_attach_global(FusionReactor * reactor,int index,void * ctx,GlobalReaction * reaction)1200 fusion_reactor_attach_global( FusionReactor *reactor,
1201 int index,
1202 void *ctx,
1203 GlobalReaction *reaction )
1204 {
1205 DirectResult ret;
1206 FusionSkirmish *lock;
1207
1208 D_MAGIC_ASSERT( reactor, FusionReactor );
1209
1210 D_ASSERT( index >= 0 );
1211 D_ASSERT( reaction != NULL );
1212
1213 D_DEBUG_AT( Fusion_Reactor,
1214 "fusion_reactor_attach_global( %p [%d], index %d, ctx %p, reaction %p )\n",
1215 reactor, reactor->id, index, ctx, reaction );
1216
1217 /* Initialize reaction data. */
1218 reaction->index = index;
1219 reaction->ctx = ctx;
1220 reaction->attached = true;
1221
1222 /* Remember for safety. */
1223 lock = reactor->globals_lock;
1224
1225 D_ASSERT( lock != NULL );
1226
1227 /* Lock the list of global reactions. */
1228 ret = fusion_skirmish_prevail( lock );
1229 if (ret)
1230 return ret;
1231
1232 /* FIXME: Might have changed while waiting for the lock. */
1233 if (lock != reactor->globals_lock)
1234 D_WARN( "using old lock once more" );
1235
1236 /* Prepend the reaction to the list. */
1237 direct_list_prepend( &reactor->globals, &reaction->link );
1238
1239 /* Unlock the list of global reactions. */
1240 fusion_skirmish_dismiss( lock );
1241
1242 return DR_OK;
1243 }
1244
1245 DirectResult
fusion_reactor_detach_global(FusionReactor * reactor,GlobalReaction * reaction)1246 fusion_reactor_detach_global( FusionReactor *reactor,
1247 GlobalReaction *reaction )
1248 {
1249 DirectResult ret;
1250 FusionSkirmish *lock;
1251
1252 D_MAGIC_ASSERT( reactor, FusionReactor );
1253
1254 D_ASSERT( reaction != NULL );
1255
1256 D_DEBUG_AT( Fusion_Reactor,
1257 "fusion_reactor_detach_global( %p [%d], reaction %p ) <- index %d, ctx %p\n",
1258 reactor, reactor->id, reaction, reaction->index, reaction->ctx );
1259
1260 /* Remember for safety. */
1261 lock = reactor->globals_lock;
1262
1263 D_ASSERT( lock != NULL );
1264
1265 /* Lock the list of global reactions. */
1266 ret = fusion_skirmish_prevail( lock );
1267 if (ret)
1268 return ret;
1269
1270 /* FIXME: Might have changed while waiting for the lock. */
1271 if (lock != reactor->globals_lock)
1272 D_WARN( "using old lock once more" );
1273
1274 // Do not assume the reaction is attached, because it is possible that it may not be.
1275 //D_ASSUME( reaction->attached );
1276
1277 /* Check against multiple detach. */
1278 if (reaction->attached) {
1279 /* Mark as detached. */
1280 reaction->attached = false;
1281
1282 /* Remove the reaction from the list. */
1283 direct_list_remove( &reactor->globals, &reaction->link );
1284 }
1285
1286 /* Unlock the list of global reactions. */
1287 fusion_skirmish_dismiss( lock );
1288
1289 return DR_OK;
1290 }
1291
1292 DirectResult
fusion_reactor_dispatch(FusionReactor * reactor,const void * msg_data,bool self,const ReactionFunc * globals)1293 fusion_reactor_dispatch( FusionReactor *reactor,
1294 const void *msg_data,
1295 bool self,
1296 const ReactionFunc *globals )
1297 {
1298 D_MAGIC_ASSERT( reactor, FusionReactor );
1299
1300 return fusion_reactor_dispatch_channel( reactor, 0, msg_data, reactor->msg_size, self, globals );
1301 }
1302
1303 DirectResult
fusion_reactor_sized_dispatch(FusionReactor * reactor,const void * msg_data,int msg_size,bool self,const ReactionFunc * globals)1304 fusion_reactor_sized_dispatch( FusionReactor *reactor,
1305 const void *msg_data,
1306 int msg_size,
1307 bool self,
1308 const ReactionFunc *globals )
1309 {
1310 D_MAGIC_ASSERT( reactor, FusionReactor );
1311
1312 return fusion_reactor_dispatch_channel( reactor, 0, msg_data, msg_size, self, globals );
1313 }
1314
1315 DirectResult
fusion_reactor_direct(FusionReactor * reactor,bool direct)1316 fusion_reactor_direct( FusionReactor *reactor, bool direct )
1317 {
1318 D_MAGIC_ASSERT( reactor, FusionReactor );
1319
1320 reactor->direct = direct;
1321
1322 return DR_OK;
1323 }
1324
1325
1326 void
_fusion_reactor_free_all(FusionWorld * world)1327 _fusion_reactor_free_all( FusionWorld *world )
1328 {
1329 ReactorNode *node, *node_temp;
1330
1331 D_MAGIC_ASSERT( world, FusionWorld );
1332
1333 D_DEBUG_AT( Fusion_Reactor, "_fusion_reactor_free_all() <- nodes %p\n", world->reactor_nodes );
1334
1335
1336 direct_mutex_lock( &world->reactor_nodes_lock );
1337
1338 direct_list_foreach_safe (node, node_temp, world->reactor_nodes) {
1339 NodeLink *link, *link_temp;
1340
1341 D_MAGIC_ASSERT( node, ReactorNode );
1342
1343 pthread_rwlock_wrlock( &node->lock );
1344
1345 direct_list_foreach_safe (link, link_temp, node->links) {
1346 D_MAGIC_ASSERT( link, NodeLink );
1347
1348 D_MAGIC_CLEAR( link );
1349
1350 D_FREE( link );
1351 }
1352
1353 pthread_rwlock_unlock( &node->lock );
1354 pthread_rwlock_destroy( &node->lock );
1355
1356 D_MAGIC_CLEAR( node );
1357
1358 D_FREE( node );
1359 }
1360
1361 world->reactor_nodes = NULL;
1362
1363 direct_mutex_unlock( &world->reactor_nodes_lock );
1364 }
1365
1366 static void
process_globals(FusionReactor * reactor,const void * msg_data,const ReactionFunc * globals)1367 process_globals( FusionReactor *reactor,
1368 const void *msg_data,
1369 const ReactionFunc *globals )
1370 {
1371 DirectLink *n;
1372 GlobalReaction *global;
1373 FusionSkirmish *lock;
1374 int max_index = -1;
1375
1376 D_MAGIC_ASSERT( reactor, FusionReactor );
1377
1378 D_ASSERT( msg_data != NULL );
1379 D_ASSERT( globals != NULL );
1380
1381 /* D_DEBUG_AT( Fusion_Reactor, " process_globals( %p [%d], msg_data %p, globals %p )\n",
1382 reactor, reactor->id, msg_data, globals );*/
1383
1384 /* Find maximum reaction index. */
1385 while (globals[max_index+1])
1386 max_index++;
1387
1388 if (max_index < 0)
1389 return;
1390
1391 /* Remember for safety. */
1392 lock = reactor->globals_lock;
1393
1394 D_ASSERT( lock != NULL );
1395
1396 /* Lock the list of global reactions. */
1397 if (fusion_skirmish_prevail( lock ))
1398 return;
1399
1400 /* FIXME: Might have changed while waiting for the lock. */
1401 if (lock != reactor->globals_lock)
1402 D_WARN( "using old lock once more" );
1403
1404 /* Loop through all global reactions. */
1405 direct_list_foreach_safe (global, n, reactor->globals) {
1406 int index = global->index;
1407
1408 /* Check if the index is valid. */
1409 if (index < 0 || index > max_index) {
1410 D_WARN( "index out of bounds (%d/%d)", global->index, max_index );
1411 continue;
1412 }
1413
1414 /* Call reaction and remove it if requested. */
1415 if (globals[global->index]( msg_data, global->ctx ) == RS_REMOVE) {
1416 /*D_DEBUG_AT( Fusion_Reactor, " -> removing %p, index %d, ctx %p\n",
1417 global, global->index, global->ctx );*/
1418
1419 // Mark as detached, since the global reaction is being removed
1420 // from the global reaction list.
1421 global->attached = false;
1422
1423 direct_list_remove( &reactor->globals, &global->link );
1424 }
1425 }
1426
1427 /* Unlock the list of global reactions. */
1428 fusion_skirmish_dismiss( lock );
1429 }
1430
1431
1432
1433 /*****************************
1434 * File internal functions *
1435 *****************************/
1436
1437 static ReactorNode *
lock_node(int reactor_id,bool add_it,bool wlock,FusionReactor * reactor,FusionWorld * world)1438 lock_node( int reactor_id, bool add_it, bool wlock, FusionReactor *reactor, FusionWorld *world )
1439 {
1440 DirectLink *n;
1441 ReactorNode *node;
1442 FusionWorldShared *shared;
1443
1444 D_DEBUG_AT( Fusion_Reactor, " lock_node( [%d], add %s, reactor %p )\n",
1445 reactor_id, add_it ? "true" : "false", reactor );
1446
1447 D_ASSERT( reactor != NULL || (!add_it && world != NULL) );
1448
1449 if (reactor) {
1450 D_MAGIC_ASSERT( reactor, FusionReactor );
1451
1452 shared = reactor->shared;
1453
1454 D_MAGIC_ASSERT( shared, FusionWorldShared );
1455
1456 world = _fusion_world( shared );
1457
1458 D_MAGIC_ASSERT( world, FusionWorld );
1459 }
1460 else {
1461 D_MAGIC_ASSERT( world, FusionWorld );
1462
1463 shared = world->shared;
1464
1465 D_MAGIC_ASSERT( shared, FusionWorldShared );
1466 }
1467
1468
1469 direct_mutex_lock( &world->reactor_nodes_lock );
1470
1471 direct_list_foreach_safe (node, n, world->reactor_nodes) {
1472 D_MAGIC_ASSERT( node, ReactorNode );
1473
1474 if (node->reactor_id == reactor_id) {
1475 if (wlock) {
1476 DirectLink *n;
1477 NodeLink *link;
1478
1479 pthread_rwlock_wrlock( &node->lock );
1480
1481 /* FIXME: don't cleanup asynchronously */
1482 direct_list_foreach_safe (link, n, node->links) {
1483 D_MAGIC_ASSERT( link, NodeLink );
1484
1485 if (!link->reaction) {
1486 D_DEBUG_AT( Fusion_Reactor, " -> cleaning up %p\n", link );
1487
1488 remove_node_link( node, link );
1489 }
1490 else
1491 D_ASSERT( link->reaction->node_link == link );
1492 }
1493 }
1494 else
1495 pthread_rwlock_rdlock( &node->lock );
1496
1497 /* FIXME: Don't cleanup asynchronously. */
1498 if (!node->links && !add_it) {
1499 // D_DEBUG_AT( Fusion_Reactor, " -> cleaning up mine %p\n", node );
1500
1501 direct_list_remove( &world->reactor_nodes, &node->link );
1502
1503 pthread_rwlock_unlock( &node->lock );
1504 pthread_rwlock_destroy( &node->lock );
1505
1506 D_MAGIC_CLEAR( node );
1507
1508 D_FREE( node );
1509
1510 node = NULL;
1511 }
1512 else {
1513 /* D_DEBUG_AT( Fusion_Reactor, " -> found %p (%d reactions)\n",
1514 node, direct_list_count_elements_EXPENSIVE( node->reactions ) );*/
1515
1516 D_ASSERT( node->reactor == reactor || reactor == NULL );
1517
1518 direct_list_move_to_front( &world->reactor_nodes, &node->link );
1519 }
1520
1521 direct_mutex_unlock( &world->reactor_nodes_lock );
1522
1523 return node;
1524 }
1525
1526 /* FIXME: Don't cleanup asynchronously. */
1527 if (!pthread_rwlock_trywrlock( &node->lock )) {
1528 if (!node->links) {
1529 // D_DEBUG_AT( Fusion_Reactor, " -> cleaning up other %p\n", node );
1530
1531 direct_list_remove( &world->reactor_nodes, &node->link );
1532
1533 pthread_rwlock_unlock( &node->lock );
1534 pthread_rwlock_destroy( &node->lock );
1535
1536 D_MAGIC_CLEAR( node );
1537
1538 D_FREE( node );
1539 }
1540 else {
1541 /*D_DEBUG_AT( Fusion_Reactor, " -> keeping other %p (%d reactions)\n",
1542 node, direct_list_count_elements_EXPENSIVE( node->reactions ) );*/
1543
1544 pthread_rwlock_unlock( &node->lock );
1545 }
1546 }
1547 }
1548
1549 // D_DEBUG_AT( Fusion_Reactor, " -> not found%s adding\n", add_it ? ", but" : " and not" );
1550
1551 if (add_it) {
1552 D_MAGIC_ASSERT( reactor, FusionReactor );
1553
1554 node = D_CALLOC( 1, sizeof(ReactorNode) );
1555 if (!node) {
1556 D_OOM();
1557 return NULL;
1558 }
1559
1560 //direct_util_recursive_pthread_mutex_init( &node->lock );
1561 pthread_rwlock_init( &node->lock, NULL );
1562
1563
1564 if (wlock)
1565 pthread_rwlock_wrlock( &node->lock );
1566 else
1567 pthread_rwlock_rdlock( &node->lock );
1568
1569 node->reactor_id = reactor_id;
1570 node->reactor = reactor;
1571
1572 D_MAGIC_SET( node, ReactorNode );
1573
1574 direct_list_prepend( &world->reactor_nodes, &node->link );
1575
1576 direct_mutex_unlock( &world->reactor_nodes_lock );
1577
1578 return node;
1579 }
1580
1581 direct_mutex_unlock( &world->reactor_nodes_lock );
1582
1583 return NULL;
1584 }
1585
1586 static void
unlock_node(ReactorNode * node)1587 unlock_node( ReactorNode *node )
1588 {
1589 D_ASSERT( node != NULL );
1590
1591 // D_MAGIC_ASSERT( node->reactor, FusionReactor );
1592
1593 /* D_DEBUG_AT( Fusion_Reactor, " unlock_node( %p, reactor %p [%d] )\n",
1594 node, node->reactor, node->reactor->id );*/
1595
1596 pthread_rwlock_unlock( &node->lock );
1597 }
1598
1599 #else /* FUSION_BUILD_MULTI */
1600
1601 /***************************
1602 * Internal declarations *
1603 ***************************/
1604
1605 /*
1606 *
1607 */
1608 struct __Fusion_FusionReactor {
1609 DirectLink *reactions; /* reactor listeners attached to node */
1610 pthread_mutex_t reactions_lock;
1611
1612 DirectLink *globals; /* global reactions attached to node */
1613 pthread_mutex_t globals_lock;
1614
1615 bool destroyed;
1616
1617 int msg_size;
1618 };
1619
1620 static void
1621 process_globals( FusionReactor *reactor,
1622 const void *msg_data,
1623 const ReactionFunc *globals );
1624
1625 /****************
1626 * Public API *
1627 ****************/
1628
1629 FusionReactor *
fusion_reactor_new(int msg_size,const char * name,const FusionWorld * world)1630 fusion_reactor_new( int msg_size,
1631 const char *name,
1632 const FusionWorld *world )
1633 {
1634 FusionReactor *reactor;
1635
1636 reactor = D_CALLOC( 1, sizeof(FusionReactor) );
1637 if (!reactor)
1638 return NULL;
1639
1640 reactor->msg_size = msg_size;
1641
1642 direct_util_recursive_pthread_mutex_init( &reactor->reactions_lock );
1643 direct_util_recursive_pthread_mutex_init( &reactor->globals_lock );
1644
1645 return reactor;
1646 }
1647
1648 DirectResult
fusion_reactor_set_lock(FusionReactor * reactor,FusionSkirmish * lock)1649 fusion_reactor_set_lock( FusionReactor *reactor,
1650 FusionSkirmish *lock )
1651 {
1652 D_ASSERT( reactor != NULL );
1653 D_ASSERT( lock != NULL );
1654
1655 // D_UNIMPLEMENTED();
1656
1657 return DR_UNIMPLEMENTED;
1658 }
1659
1660 DirectResult
fusion_reactor_set_lock_only(FusionReactor * reactor,FusionSkirmish * lock)1661 fusion_reactor_set_lock_only( FusionReactor *reactor,
1662 FusionSkirmish *lock )
1663 {
1664 D_ASSERT( reactor != NULL );
1665 D_ASSERT( lock != NULL );
1666
1667 return DR_UNIMPLEMENTED;
1668 }
1669
1670 DirectResult
fusion_reactor_attach(FusionReactor * reactor,ReactionFunc func,void * ctx,Reaction * reaction)1671 fusion_reactor_attach (FusionReactor *reactor,
1672 ReactionFunc func,
1673 void *ctx,
1674 Reaction *reaction)
1675 {
1676 return fusion_reactor_attach_channel( reactor, 0, func, ctx, reaction );
1677 }
1678
1679 DirectResult
fusion_reactor_detach(FusionReactor * reactor,Reaction * reaction)1680 fusion_reactor_detach (FusionReactor *reactor,
1681 Reaction *reaction)
1682 {
1683 D_ASSERT( reactor != NULL );
1684 D_ASSERT( reaction != NULL );
1685
1686 pthread_mutex_lock( &reactor->reactions_lock );
1687
1688 direct_list_remove( &reactor->reactions, &reaction->link );
1689
1690 pthread_mutex_unlock( &reactor->reactions_lock );
1691
1692 return DR_OK;
1693 }
1694
1695 DirectResult
fusion_reactor_attach_global(FusionReactor * reactor,int index,void * ctx,GlobalReaction * reaction)1696 fusion_reactor_attach_global (FusionReactor *reactor,
1697 int index,
1698 void *ctx,
1699 GlobalReaction *reaction)
1700 {
1701 D_ASSERT( reactor != NULL );
1702 D_ASSERT( index >= 0 );
1703 D_ASSERT( reaction != NULL );
1704
1705 reaction->index = index;
1706 reaction->ctx = ctx;
1707 // Mark the reaction as attached now.
1708 reaction->attached = true;
1709
1710 pthread_mutex_lock( &reactor->globals_lock );
1711
1712 direct_list_prepend( &reactor->globals, &reaction->link );
1713
1714 pthread_mutex_unlock( &reactor->globals_lock );
1715
1716 return DR_OK;
1717 }
1718
1719 DirectResult
fusion_reactor_detach_global(FusionReactor * reactor,GlobalReaction * reaction)1720 fusion_reactor_detach_global (FusionReactor *reactor,
1721 GlobalReaction *reaction)
1722 {
1723 D_ASSERT( reactor != NULL );
1724 D_ASSERT( reaction != NULL );
1725
1726 pthread_mutex_lock( &reactor->globals_lock );
1727
1728 // Check to prevent multiple detaches from being performed.
1729 if (reaction->attached) {
1730 // Mark as detached, since the global reaction is being removed
1731 // from the global reaction list.
1732 reaction->attached = false;
1733
1734 // Remove the reaction from the list.
1735 direct_list_remove( &reactor->globals, &reaction->link );
1736 }
1737
1738 pthread_mutex_unlock( &reactor->globals_lock );
1739
1740 return DR_OK;
1741 }
1742
1743 DirectResult
fusion_reactor_attach_channel(FusionReactor * reactor,int channel,ReactionFunc func,void * ctx,Reaction * reaction)1744 fusion_reactor_attach_channel( FusionReactor *reactor,
1745 int channel,
1746 ReactionFunc func,
1747 void *ctx,
1748 Reaction *reaction )
1749 {
1750 D_ASSERT( reactor != NULL );
1751 D_ASSERT( func != NULL );
1752 D_ASSERT( reaction != NULL );
1753
1754 reaction->func = func;
1755 reaction->ctx = ctx;
1756 reaction->node_link = (void*)(long) channel;
1757
1758 pthread_mutex_lock( &reactor->reactions_lock );
1759
1760 direct_list_prepend( &reactor->reactions, &reaction->link );
1761
1762 pthread_mutex_unlock( &reactor->reactions_lock );
1763
1764 return DR_OK;
1765 }
1766
1767 DirectResult
fusion_reactor_dispatch_channel(FusionReactor * reactor,int channel,const void * msg_data,int msg_size,bool self,const ReactionFunc * globals)1768 fusion_reactor_dispatch_channel( FusionReactor *reactor,
1769 int channel,
1770 const void *msg_data,
1771 int msg_size,
1772 bool self,
1773 const ReactionFunc *globals )
1774 {
1775 DirectLink *l;
1776
1777 D_ASSERT( reactor != NULL );
1778 D_ASSERT( msg_data != NULL );
1779
1780 if (reactor->globals) {
1781 if (globals)
1782 process_globals( reactor, msg_data, globals );
1783 else
1784 D_ERROR( "Fusion/Reactor: global reactions exist but no "
1785 "globals have been passed to dispatch()\n" );
1786 }
1787
1788 if (!self)
1789 return DR_OK;
1790
1791 pthread_mutex_lock( &reactor->reactions_lock );
1792
1793 l = reactor->reactions;
1794 while (l) {
1795 DirectLink *next = l->next;
1796 Reaction *reaction = (Reaction*) l;
1797
1798 if ((long) reaction->node_link == channel) {
1799 switch (reaction->func( msg_data, reaction->ctx )) {
1800 case RS_REMOVE:
1801 direct_list_remove( &reactor->reactions, l );
1802 break;
1803
1804 case RS_DROP:
1805 pthread_mutex_unlock( &reactor->reactions_lock );
1806 return DR_OK;
1807
1808 default:
1809 break;
1810 }
1811 }
1812
1813 l = next;
1814 }
1815
1816 pthread_mutex_unlock( &reactor->reactions_lock );
1817
1818 return DR_OK;
1819 }
1820
1821 DirectResult
fusion_reactor_set_dispatch_callback(FusionReactor * reactor,FusionCall * call,void * call_ptr)1822 fusion_reactor_set_dispatch_callback( FusionReactor *reactor,
1823 FusionCall *call,
1824 void *call_ptr )
1825 {
1826 D_UNIMPLEMENTED();
1827
1828 return DR_UNIMPLEMENTED;
1829 }
1830
1831 DirectResult
fusion_reactor_set_name(FusionReactor * reactor,const char * name)1832 fusion_reactor_set_name( FusionReactor *reactor,
1833 const char *name )
1834 {
1835 D_UNIMPLEMENTED();
1836
1837 return DR_UNIMPLEMENTED;
1838 }
1839
1840 DirectResult
fusion_reactor_dispatch(FusionReactor * reactor,const void * msg_data,bool self,const ReactionFunc * globals)1841 fusion_reactor_dispatch (FusionReactor *reactor,
1842 const void *msg_data,
1843 bool self,
1844 const ReactionFunc *globals)
1845 {
1846 return fusion_reactor_dispatch_channel( reactor, 0, msg_data, reactor->msg_size, self, globals );
1847 }
1848
1849 DirectResult
fusion_reactor_direct(FusionReactor * reactor,bool direct)1850 fusion_reactor_direct( FusionReactor *reactor, bool direct )
1851 {
1852 D_ASSERT( reactor != NULL );
1853
1854 return DR_OK;
1855 }
1856
1857 DirectResult
fusion_reactor_destroy(FusionReactor * reactor)1858 fusion_reactor_destroy (FusionReactor *reactor)
1859 {
1860 D_ASSERT( reactor != NULL );
1861
1862 D_ASSUME( !reactor->destroyed );
1863
1864 reactor->destroyed = true;
1865
1866 return DR_OK;
1867 }
1868
1869 DirectResult
fusion_reactor_free(FusionReactor * reactor)1870 fusion_reactor_free (FusionReactor *reactor)
1871 {
1872 D_ASSERT( reactor != NULL );
1873
1874 // D_ASSUME( reactor->destroyed );
1875
1876 reactor->reactions = NULL;
1877 pthread_mutex_destroy( &reactor->reactions_lock );
1878
1879 reactor->globals = NULL;
1880 pthread_mutex_destroy( &reactor->globals_lock );
1881
1882 D_FREE( reactor );
1883
1884 return DR_OK;
1885 }
1886
1887 /******************************************************************************/
1888
1889 static void
process_globals(FusionReactor * reactor,const void * msg_data,const ReactionFunc * globals)1890 process_globals( FusionReactor *reactor,
1891 const void *msg_data,
1892 const ReactionFunc *globals )
1893 {
1894 DirectLink *n;
1895 GlobalReaction *global;
1896 int max_index = -1;
1897
1898 D_ASSERT( reactor != NULL );
1899 D_ASSERT( msg_data != NULL );
1900 D_ASSERT( globals != NULL );
1901
1902 while (globals[max_index+1])
1903 max_index++;
1904
1905 if (max_index < 0)
1906 return;
1907
1908 pthread_mutex_lock( &reactor->globals_lock );
1909
1910 direct_list_foreach_safe (global, n, reactor->globals) {
1911 if (global->index < 0 || global->index > max_index) {
1912 D_WARN( "global reaction index out of bounds (%d/%d)", global->index, max_index );
1913 }
1914 else {
1915 // Remove the reaction if requested.
1916 if (globals[ global->index ]( msg_data, global->ctx ) == RS_REMOVE) {
1917 // Mark as detached, since the global reaction is being
1918 // removed from the global reaction list.
1919 global->attached = false;
1920
1921 direct_list_remove( &reactor->globals, &global->link );
1922 }
1923 }
1924 }
1925
1926 pthread_mutex_unlock( &reactor->globals_lock );
1927 }
1928
1929 DirectResult
fusion_reactor_add_permissions(FusionReactor * reactor,FusionID fusion_id,FusionReactorPermissions permissions)1930 fusion_reactor_add_permissions( FusionReactor *reactor,
1931 FusionID fusion_id,
1932 FusionReactorPermissions permissions )
1933 {
1934 return DR_OK;
1935 }
1936
1937 #endif
1938
1939