1 /*
2  * The Spread Toolkit.
3  *
4  * The contents of this file are subject to the Spread Open-Source
5  * License, Version 1.0 (the ``License''); you may not use
6  * this file except in compliance with the License.  You may obtain a
7  * copy of the License at:
8  *
9  * http://www.spread.org/license/
10  *
11  * or in the file ``license.txt'' found in this distribution.
12  *
13  * Software distributed under the License is distributed on an AS IS basis,
14  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
15  * for the specific language governing rights and limitations under the
16  * License.
17  *
18  * The Creators of Spread are:
19  *  Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
20  *
21  *  Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
22  *
23  *  All Rights Reserved.
24  *
25  * Major Contributor(s):
26  * ---------------
27  *    Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
28  *    Theo Schlossnagle    jesus@omniti.com - Perl, skiplists, autoconf.
29  *    Dan Schoenblum       dansch@cnds.jhu.edu - Java interface.
30  *    John Schultz         jschultz@cnds.jhu.edu - contribution to process group membership.
31  *
32  */
33 
34 
35 #include <assert.h>
36 #include "arch.h"
37 #include "spread_params.h"
38 #include "network.h"
39 #include "net_types.h"
40 #include "data_link.h"
41 #include "sp_events.h"
42 #include "status.h"
43 #include "alarm.h"
44 #include "configuration.h"
45 
46 /* for Memb_print_form_token() */
47 #include "membership.h"
48 
49 static	channel		Bcast_channel[MAX_INTERFACES_PROC];
50 static  channel		Token_channel[MAX_INTERFACES_PROC];
51 static	channel		Send_channel;
52 
53 static  int             Num_bcast_channels;
54 static  int             Num_token_channels;
55 
56 static	int		Bcast_needed;
57 static	int32		Bcast_address;
58 static	int16		Bcast_port;
59 
60 static	int		Num_send_needed;
61 static  int32		Send_address[MAX_SEGMENTS];
62 static	int16		Send_ports[MAX_SEGMENTS];
63 
64 /* ### Pack: 3 lines */
65 /* Global in function so both Net_queue_bcast and Net_flush_bcast can access them */
66 static  sys_scatter     Queue_scat;
67 static  int             Queued_bytes = 0;
68 static  const char      align_padding[4] = "padd";
69 
70 /* address for token sending - which is always needed */
71 static	int32		Token_address;
72 static	int16		Token_port;
73 
74 static	configuration	Net_membership;
75 static	int		Segment_leader;
76 
77 static 	configuration	Cn;
78 static	proc		My;
79 static	segment 	My_seg;
80 
81 static	int16		Partition[MAX_PROCS_RING];
82 static	sp_time		Partition_timeout 	= { 60, 0};
83 static	int		My_index;
84 
85 static	void		Clear_partition(int dummy, void *dummy_p);
86 static	int		In_my_component( int32	proc_id );
87 static	void		Flip_pack( packet_header *pack_ptr );
88 static	void		Flip_token( token_header *token_ptr );
89 
90 
Net_init()91 void	Net_init()
92 {
93 	proc		dummy_proc;
94         int32u          interface_addr;
95         int             i;
96         bool            bcast_bound = FALSE;
97 
98 	Cn = Conf();
99 	My = Conf_my();
100 
101 	My_index = Conf_proc_by_id( My.id, &dummy_proc );
102 	Clear_partition(0, NULL);
103 
104 	if( Cn.segments[My.seg_index].num_procs > 1 )
105 	{
106 		/* I am not allone in segment */
107 		Bcast_needed  = 1;
108 		Bcast_address = Cn.segments[My.seg_index].bcast_address;
109 		Bcast_port    = My.port;
110 
111 		Alarm( NETWORK, "Net_init: Bcast needed to address (%d, %d)\n",
112 			Bcast_address, Bcast_port );
113 	}else{
114 		Bcast_needed  = 0;
115 		Bcast_address = 0;
116 		Alarm( NETWORK, "Net_init: Bcast is not needed\n" );
117 	}
118 
119         /* To receive broadcast (and possibly multicast) packets on a socket
120          * bound to a specific interface, we also have to bind to the broadcast
121          * address on the interface as well as the unicast interface. That is
122          * what the double bind of the bcast_address does.
123          */
124         for ( i=0; i < My.num_if; i++)
125         {
126                 if (Is_IfType_Daemon( My.ifc[i].type ) || Is_IfType_Any( My.ifc[i].type ) )
127                 {
128                         if (Is_IfType_Any( My.ifc[i].type ) )
129                                 interface_addr = 0;
130                         else {
131                                 interface_addr = My.ifc[i].ip;
132                                 if (Bcast_needed && !bcast_bound) {
133                                     Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, Bcast_address );
134                                     bcast_bound = TRUE;
135                                 }
136                         }
137                         Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, interface_addr );
138                         Token_channel[Num_token_channels++] = DL_init_channel( RECV_CHANNEL, My.port+1, 0, interface_addr );
139                 }
140         }
141 
142 	Send_channel  = DL_init_channel( SEND_CHANNEL, My.port+2, 0, My.id );
143 
144 	Num_send_needed = 0;
145 }
146 
Net_set_membership(configuration memb)147 void	Net_set_membership( configuration memb )
148 {
149 	int	i;
150 	int	my_index_in_seg;
151 	int	my_next_index;
152 
153 	Net_membership = memb;
154 	My_seg = Net_membership.segments[My.seg_index];
155 	my_index_in_seg = Conf_id_in_seg( &My_seg, My.id );
156 	if( my_index_in_seg < 0 )
157 		Alarm( EXIT,"Net_set_membership: I am not in membership %c",
158 			Conf_print( &Net_membership ) );
159 	else if( my_index_in_seg == 0) {
160 		Segment_leader = 1;
161 		Alarm( NETWORK,"Net_set_membership: I am a Segment leader\n");
162 	} else  Segment_leader = 0;
163 
164 	Num_send_needed = 0;
165 	my_next_index = -1;
166 	for( i=0; i < Conf().num_segments; i++ )
167 	{
168 	    if( i == My.seg_index )
169 	    {
170 		/*
171 		 * This is my segment.
172 		 * There is no need to send to it
173 		 * but I need my_next_index to calculate
174 		 * the token send address.
175 		 */
176 		my_next_index = Num_send_needed;
177 
178 	    } else if( Net_membership.segments[i].num_procs > 0 ) {
179 
180 		Send_address[Num_send_needed] = Net_membership.segments[i].procs[0]->id;
181 		Send_ports  [Num_send_needed] = Net_membership.segments[i].port;
182 
183 		Num_send_needed++;
184 	    }
185 	}
186 	assert(my_next_index != -1);
187 	for( i=0; i < Num_send_needed; i++ )
188 		Alarm( NETWORK,
189 			"Net_set_membership: Send_addr[%d] is (%d,%d)\n",
190 			i, Send_address[i], Send_ports[i] );
191 
192 	/* Calculate where to send the token */
193 	Token_address = 0;
194 	if( my_index_in_seg < My_seg.num_procs-1 )
195 	{
196 		Token_address = My_seg.procs[my_index_in_seg+1]->id;
197 		Token_port    = My_seg.port+1;
198 	}else{
199 		/* I am last in my segment */
200 		if( Num_send_needed == 0 )
201 		{
202 			/*
203 			 * My segment is the only segment
204 			 * sending token to the first in my segment
205 			 */
206 			Token_address = My_seg.procs[0]->id;
207 			Token_port    = My_seg.port+1;
208 		} else if( Num_send_needed == my_next_index ) {
209 			/*
210 			 * My segment is the last segment
211 			 * sending token to the first in first valid segment
212 			 */
213 			Token_address = Send_address[0];
214 			Token_port    = Send_ports[0]+1;
215 		} else {
216 			/*
217 			 * There is a valid segment after mine
218 			 * sending token to the first in next valid segment
219 			 */
220 			Token_address = Send_address[my_next_index];
221 			Token_port    = Send_ports[my_next_index]+1;
222 		}
223 
224 	}
225 	Alarm( NETWORK, "Net_set_membership: Token_address : (%d, %d)\n",
226 			Token_address, Token_port );
227 }
228 
Net_bcast(sys_scatter * scat)229 int	Net_bcast( sys_scatter *scat )
230 {
231 	packet_header	*pack_ptr;
232 	int 		i;
233 	int		ret;
234 
235 	ret = 0;
236 	/* routing on channels if needed according to membership */
237 	pack_ptr = (packet_header *)scat->elements[0].buf;
238 	pack_ptr->type  = Set_routed( pack_ptr->type );
239 	pack_ptr->type  = Set_endian( pack_ptr->type );
240 	pack_ptr->transmiter_id = My.id;
241 	for ( i=0; i< Num_send_needed; i++ )
242 	{
243 	    ret = DL_send( Send_channel, Send_address[i], Send_ports[i], scat );
244 	}
245 	pack_ptr->type = Clear_routed( pack_ptr->type );
246 
247 	/* broadcasting if needed according to configuration */
248 	if( Bcast_needed )
249 	{
250 	    ret = DL_send( Send_channel, Bcast_address, Bcast_port, scat );
251 	}
252 
253         if( !Bcast_needed && (Num_send_needed == 0) )
254             ret = 1; /* No actual send is needed, but 'packet' can be considered 'sent' */
255 
256 	return( ret );
257 }
258 
259 /* ### Pack: 2 routines */
Net_queue_bcast(sys_scatter * scat)260 int	Net_queue_bcast( sys_scatter *scat )
261 {
262 	packet_header	*pack_ptr;
263         int             new_bytes;
264 	int 		i, j;
265         int             ret;
266         int             align_bytes, align_num_scatter;
267 
268         /* This line is redundent because of static initialization to 0 */
269         if ( Queued_bytes == 0 ) Queue_scat.num_elements = 0;
270 
271         ret = 0;
272         new_bytes = 0;
273 
274         for ( i=0; i < scat->num_elements; i++) {
275                 new_bytes += scat->elements[i].len;
276         }
277         /* Fix alignment of packed messages  so they will each begin on a 4 byte alignment
278          * This is needed for Sparc, might need enhancement if other archs
279          * have more extensive alignement rules
280          */
281         align_bytes = 0;
282         align_num_scatter = 0;
283         switch(Queued_bytes % 4) {
284         case 1:
285                 align_bytes++;
286         case 2:
287                 align_bytes++;
288         case 3:
289                 align_bytes++;
290                 align_num_scatter = 1;
291         case 0:
292                 /* nothing since already aligned */
293                 break;
294         }
295 
296         if ( ( (Queued_bytes + new_bytes + align_bytes)  >  MAX_PACKET_SIZE ) ||
297              ( (Queue_scat.num_elements + scat->num_elements + align_num_scatter) > ARCH_SCATTER_SIZE ) )
298         {
299                 ret = Net_flush_bcast();
300                 align_bytes = 0;
301                 align_num_scatter = 0;
302         }
303 
304         if ( Queued_bytes == 0 ) {
305                 /* routing on channels if needed according to membership */
306                 pack_ptr = (packet_header *)scat->elements[0].buf;
307                 pack_ptr->type  = Set_routed( pack_ptr->type );
308                 pack_ptr->type  = Set_endian( pack_ptr->type );
309                 pack_ptr->transmiter_id = My.id;
310         }
311 
312         if ( align_bytes > 0 )
313         {
314                 Queue_scat.elements[Queue_scat.num_elements].len = align_bytes;
315                 Queue_scat.elements[Queue_scat.num_elements].buf = (char *)align_padding;
316 
317                 Queued_bytes += align_bytes;
318                 Queue_scat.num_elements += 1;
319                 Alarm(NETWORK, "Net_queue_bcast: Inserted padding of %d bytes to message of size %d\n", align_bytes, new_bytes );
320         }
321 
322         /* Add new packet to Queue_scat to be sent as packed packet */
323         for ( i=0, j=Queue_scat.num_elements; i < scat->num_elements; i++, j++) {
324                 Queue_scat.elements[j].len = scat->elements[i].len;
325                 Queue_scat.elements[j].buf = scat->elements[i].buf;
326         }
327         Queued_bytes += new_bytes;
328         Queue_scat.num_elements += scat->num_elements ;
329 
330 	return( ret );
331 }
332 
Net_flush_bcast(void)333 int     Net_flush_bcast(void)
334 {
335         packet_header   *pack_ptr;
336         int             i;
337         int             ret;
338 
339         if (Queued_bytes == 0 ) return( 0 );
340 
341         Alarm(NETWORK, "Net_flush_bcast: Flushing with Queued_bytes = %d; num_elements in scat = %d; size of scat0,1 = %d %d\n", Queued_bytes, Queue_scat.num_elements, Queue_scat.elements[0].len, Queue_scat.elements[1].len);
342 
343         ret = 0;
344 
345         for ( i=0; i< Num_send_needed; i++ )
346         {
347                 ret = DL_send( Send_channel, Send_address[i], Send_ports[i], &Queue_scat );
348         }
349         pack_ptr = (packet_header *)Queue_scat.elements[0].buf;
350 	pack_ptr->type = Clear_routed( pack_ptr->type );
351 
352         /* broadcasting if needed according to configuration */
353         if( Bcast_needed )
354         {
355                 ret = DL_send( Send_channel, Bcast_address, Bcast_port, &Queue_scat );
356         }
357 
358         if( !Bcast_needed && (Num_send_needed == 0) )
359             ret = 1; /* No actual send is needed, but 'packet' can be considered 'sent' */
360 
361         Queue_scat.num_elements = 0;
362         Queued_bytes = 0;
363         return( ret );
364 }
365 
Net_scast(int16 seg_index,sys_scatter * scat)366 int	Net_scast( int16 seg_index, sys_scatter *scat )
367 {
368 	packet_header	*pack_ptr;
369 	int		ret;
370         bool            send_not_needed_p = FALSE;
371 
372 	ret = 0;
373 	pack_ptr = (packet_header *)scat->elements[0].buf;
374 	pack_ptr->type = Set_endian( pack_ptr->type );
375 	pack_ptr->transmiter_id = My.id;
376 	if( seg_index == My.seg_index )
377 	{
378 	    if( Bcast_needed )
379 	    {
380 	    	ret = DL_send( Send_channel, Bcast_address, Bcast_port, scat );
381 	    } else
382                 send_not_needed_p = TRUE;
383 	}else{
384 	    if( Net_membership.segments[seg_index].num_procs > 0 )
385 	    {
386 		pack_ptr->type = Set_routed( pack_ptr->type );
387 	    	ret = DL_send( Send_channel,
388 			Net_membership.segments[seg_index].procs[0]->id,
389 			Net_membership.segments[seg_index].port,
390 			scat );
391 		pack_ptr->type = Clear_routed( pack_ptr->type );
392 	    } else
393                 send_not_needed_p = TRUE;
394 	}
395 
396         if (send_not_needed_p)
397             ret = 1; /* notify that packet can be considered sent, even though no network send actually needed */
398 
399 	return( ret );
400 }
401 
Net_ucast(int32 proc_id,sys_scatter * scat)402 int	Net_ucast( int32 proc_id, sys_scatter *scat )
403 {
404 	packet_header	*pack_ptr;
405 	proc		p;
406 	int		ret;
407 
408 	pack_ptr = (packet_header *)scat->elements[0].buf;
409 	pack_ptr->type = Set_endian( pack_ptr->type );
410 	pack_ptr->transmiter_id = My.id;
411 	ret = Conf_proc_by_id( proc_id, &p );
412 	if( ret < 0 )
413 	{
414 		Alarm( PRINT, "Net_ucast: non existing proc_id %d\n",proc_id );
415 		return( ret );
416 	}
417 	ret = DL_send( Send_channel, proc_id, p.port, scat );
418 	return( ret );
419 }
420 
Net_recv(channel fd,sys_scatter * scat)421 int	Net_recv ( channel fd, sys_scatter *scat )
422 {
423 static	scatter		save;
424 	packet_header	*pack_ptr;
425 	int		bytes_left;
426 	int		received_bytes, body_offset;
427 	int		processed_bytes;
428 	int		pack_same_endian;
429 	int		i;
430         bool            ch_found;
431 
432 	pack_ptr = (packet_header *)scat->elements[0].buf;
433 
434         ch_found = FALSE;
435         for (i = 0 ; i < Num_bcast_channels; i++) {
436             if ( fd == Bcast_channel[i]) {
437                 ch_found = TRUE;
438                 break;
439             }
440         }
441         if (ch_found == FALSE) {
442             Alarm(EXIT, "Net_recv: Listening and received packet on un-used interface %d\n", fd);
443         }
444 
445 	received_bytes = DL_recv( fd, scat );
446 
447 	if( received_bytes <= 0 ) return( received_bytes );
448 
449 	if( received_bytes < sizeof( packet_header ) )
450 	{
451 		Alarm(PRINT, "Net_recv: ignoring packet of size %d, smaller than packet header size %d\n",
452 			received_bytes, sizeof(packet_header) );
453 		return( -1 );
454 	}
455 
456 	/* Fliping packet header to my form if needed */
457 	if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
458 
459 	if( Is_partition( pack_ptr->type ) )
460 	{
461 		/* Monitor : updating partition */
462 		int16	*cur_partition;
463 
464                 if ( ! Conf_get_dangerous_monitor_state() ) {
465                         Alarm( PRINT, "Net_recv: Request to set partition or kill daemons from (%d.%d.%d.%d) denied. Monitor in safe mode\n", IP1(pack_ptr->proc_id), IP2(pack_ptr->proc_id), IP3(pack_ptr->proc_id), IP4(pack_ptr->proc_id) );
466                         return( 0 );
467                 }
468 
469 		if( ! ( pack_ptr->memb_id.proc_id == 15051963 && Conf_id_in_conf( &Cn, pack_ptr->proc_id ) != -1  ) ) return( 0 );
470 
471 		cur_partition = (int16 *)scat->elements[1].buf;
472 
473 		for( i=0; i < Conf_num_procs( &Cn ); i++ )
474 		    if( Same_endian( pack_ptr->type ) ) Partition[i] = cur_partition[i];
475 		    else Partition[i] = Flip_int16( cur_partition[i] );
476 
477 		E_queue( Clear_partition, 0, NULL, Partition_timeout );
478 		if( Partition[My_index] < 0 )
479 		    Alarm( EXIT, "Net_recv: Instructed to exit by monitor\n");
480 		Alarm( PRINT  , "Net_recv: Got monitor message, component %d\n",
481 			Partition[My_index] );
482 
483 		return( 0 );
484 	}
485 
486 	/* Monitor : droping packet from other monitor components */
487 	if( ! ( pack_ptr->memb_id.proc_id == 15051963 || In_my_component( pack_ptr->transmiter_id ) ) )
488 		return( 0 );
489 
490 	/* no need to return my own packets */
491 	if( pack_ptr->transmiter_id == My.id )
492 		return( 0 );
493 
494 	if( Bcast_needed && Is_routed( pack_ptr->type ) )
495 	{
496 		if( !Segment_leader ) Alarm( NETWORK,
497 		"Net_recv: recv routed message from %d but not seg leader\n",
498 			pack_ptr->proc_id);
499 
500 		/* saving scat lens for another DL_recv */
501 		save.num_elements = scat->num_elements;
502 		for( i=0; i < save.num_elements; i++ )
503 			save.elements[i].len = scat->elements[i].len;
504 
505 		/* computing true scat lens for sending */
506 		bytes_left = received_bytes;
507 		i = 0;
508 		while ( bytes_left > 0 )
509 		{
510 			if( bytes_left < scat->elements[i].len )
511 				scat->elements[i].len = bytes_left;
512 			bytes_left -=  scat->elements[i].len;
513 			i ++;
514 		}
515 		scat->num_elements = i;
516 
517 		pack_ptr->type = Clear_routed ( pack_ptr->type );
518 		pack_ptr->transmiter_id = My.id;
519 
520 		/* fliping to original form */
521 		if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
522 		DL_send( Send_channel, Bcast_address, Bcast_port, scat );
523 		/* re-fliping to my form */
524 		if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
525 
526 		/* restoring scat lens for another DL_recv */
527 		scat->num_elements = save.num_elements;
528 		for( i=0; i < save.num_elements; i++ )
529 			scat->elements[i].len = save.elements[i].len;
530 
531 	}
532 	/*
533 	 * we clear routed anyway in order not to ask if Bcast_needed again.
534 	 * This way, if bcast is not needed we give it to the upper layer
535 	 * right away. It will always get to the upper layer with this
536 	 * bit cleared.
537 	 */
538 	pack_ptr->type = Clear_routed ( pack_ptr->type );
539 
540 	/*
541 	 * Check validity of packet size and flip every packet header
542 	 * other than first header (which is already flipped).
543 	 * If packet size is not valid, return -1, otherwise
544 	 * return size of received packet.
545 	 */
546 	processed_bytes = sizeof( packet_header ) + pack_ptr->data_len;
547 	pack_same_endian = Same_endian( pack_ptr->type );
548         /* ignore any alignment padding */
549         if ( processed_bytes < received_bytes ) {
550                 switch(processed_bytes % 4)
551                 {
552                 case 1:
553                         processed_bytes++;
554                 case 2:
555                         processed_bytes++;
556                 case 3:
557                         processed_bytes++;
558                 case 0:
559                         /* already aligned */
560                         break;
561                 }
562         }
563 	while( processed_bytes < received_bytes )
564 	{
565                 body_offset = processed_bytes - sizeof(packet_header);
566                 pack_ptr = (packet_header *)&scat->elements[1].buf[body_offset];
567 
568                 /* flip contigues packet header */
569 		if( !pack_same_endian  ) {
570                         Flip_pack( pack_ptr );
571 		}
572 
573 		processed_bytes += sizeof( packet_header ) + pack_ptr->data_len;
574                 /* ignore any alignment padding */
575                 if ( processed_bytes < received_bytes ) {
576                         switch(processed_bytes % 4)
577                         {
578                         case 1:
579                                 processed_bytes++;
580                         case 2:
581                                 processed_bytes++;
582                         case 3:
583                                 processed_bytes++;
584                         case 0:
585                                 /* already aligned */
586                                 break;
587                         }
588                 }
589 	}
590         Alarm( NETWORK, "Net_recv: Received Packet - packet length(%d), packed message length(%d)\n", received_bytes, processed_bytes);
591 	if( processed_bytes != received_bytes ) {
592                 Alarm( PRINT, "Net_recv: Received Packet - packet length(%d) != packed message length(%d)\n", received_bytes, processed_bytes);
593                 return( -1 );
594         }
595 	return( received_bytes );
596 }
597 
Net_send_token(sys_scatter * scat)598 int	Net_send_token( sys_scatter *scat )
599 {
600 	token_header	*token_ptr;
601 	int		ret;
602 
603 	token_ptr = (token_header *)scat->elements[0].buf;
604 	token_ptr->type = Set_endian( token_ptr->type );
605 	token_ptr->transmiter_id = My.id;
606 
607         if ( token_ptr->rtr_len > (MAX_PACKET_SIZE - sizeof(token_header) ) )
608         {
609             if ( Is_form( token_ptr->type ) )
610                 Memb_print_form_token( scat );
611             Alarmp( SPLOG_FATAL, PRINT, "Net_send_token: Token too long for packet!\n");
612         }
613 
614 	ret = DL_send( Send_channel, Token_address, Token_port, scat );
615 	return ( ret );
616 }
617 
Net_recv_token(channel fd,sys_scatter * scat)618 int	Net_recv_token( channel fd, sys_scatter *scat )
619 {
620 	token_header	*token_ptr;
621 	int		ret, i;
622         bool            ch_found;
623 
624 	token_ptr = (token_header *)scat->elements[0].buf;
625 
626         ch_found = FALSE;
627         for (i = 0 ; i < Num_token_channels; i++) {
628             if ( fd == Token_channel[i]) {
629                 ch_found = TRUE;
630                 break;
631             }
632         }
633         if (ch_found == FALSE) {
634             Alarm(EXIT, "Net_recv: Listening and received packet on un-used interface %d\n", fd);
635         }
636 
637 	ret = DL_recv( fd, scat );
638 
639 	if( ret <= 0 ) return( ret );
640 
641 	/* Fliping token header to my form if needed */
642 	if( !Same_endian( token_ptr->type ) ) Flip_token( token_ptr );
643 
644 	/* Monitor : droping token from other monitor components */
645 	if( !In_my_component( token_ptr->transmiter_id ) )
646 		return( 0 );
647 
648 	return ( ret );
649 }
650 
Net_ucast_token(int32 proc_id,sys_scatter * scat)651 int	Net_ucast_token( int32 proc_id, sys_scatter *scat )
652 {
653 	token_header	*token_ptr;
654 	proc		p;
655 	int		ret;
656 
657 	token_ptr = (token_header *)scat->elements[0].buf;
658 	token_ptr->type = Set_endian( token_ptr->type );
659 	token_ptr->transmiter_id = My.id;
660 	ret = Conf_proc_by_id( proc_id, &p );
661 	if( ret < 0 )
662 	{
663 		Alarm( PRINT, "Net_ucast_token: non existing proc_id %d\n",
664 			proc_id );
665 		return( ret );
666 	}
667         if ( token_ptr->rtr_len > (MAX_PACKET_SIZE - sizeof(token_header) ) )
668         {
669             Memb_print_form_token( scat );
670             Alarmp( SPLOG_FATAL, PRINT, "Net_ucast_token: Token too long for packet!\n");
671         }
672 
673 	ret = DL_send( Send_channel, proc_id, p.port+1, scat );
674 	return( ret );
675 }
676 
Net_num_channels(int * num_bcast,int * num_token)677 void     Net_num_channels(int *num_bcast, int *num_token)
678 {
679     *num_bcast = Num_bcast_channels;
680     *num_token = Num_token_channels;
681 }
682 
Net_bcast_channel()683 channel *Net_bcast_channel()
684 {
685 	return( &(Bcast_channel[0]) );
686 }
687 
Net_token_channel()688 channel *Net_token_channel()
689 {
690 	return( &(Token_channel[0]) );
691 }
692 
Clear_partition(int dummy,void * dummy_p)693 static	void	Clear_partition(int dummy, void *dummy_p)
694 {
695 	int	i;
696 
697 	for( i=0; i < Conf_num_procs( &Cn ); i++ )
698 		Partition[i] = 0;
699 }
700 
In_my_component(int32 proc_id)701 static	int	In_my_component( int32	proc_id )
702 {
703 	int	proc_index;
704 	proc	dummy_proc;
705 	char	ip[16];
706 
707 	proc_index = Conf_proc_by_id( proc_id, &dummy_proc );
708 	if( proc_index < 0 )
709 	{
710 		Conf_id_to_str( proc_id, ip );
711 		Alarm( PRINT, "In_my_component: unknown proc %s\n", ip );
712 		return( 0 );
713 	}
714 
715 	return( Partition[My_index] == Partition[proc_index] );
716 }
717 
Flip_pack(packet_header * pack_ptr)718 void	Flip_pack( packet_header *pack_ptr )
719 {
720 	pack_ptr->type		  = Flip_int32( pack_ptr->type );
721 	pack_ptr->transmiter_id	  = Flip_int32( pack_ptr->transmiter_id );
722 	pack_ptr->proc_id	  = Flip_int32( pack_ptr->proc_id );
723 	pack_ptr->memb_id.proc_id = Flip_int32( pack_ptr->memb_id.proc_id );
724 	pack_ptr->memb_id.time	  = Flip_int32( pack_ptr->memb_id.time );
725 	pack_ptr->seq		  = Flip_int32( pack_ptr->seq );
726 	pack_ptr->fifo_seq	  = Flip_int32( pack_ptr->fifo_seq );
727 	pack_ptr->packet_index	  = Flip_int16( pack_ptr->packet_index );
728 	pack_ptr->data_len	  = Flip_int16( pack_ptr->data_len );
729 }
730 
Flip_token(token_header * token_ptr)731 void	Flip_token( token_header *token_ptr )
732 {
733 	token_ptr->type		 = Flip_int32( token_ptr->type );
734 	token_ptr->transmiter_id = Flip_int32( token_ptr->transmiter_id );
735 	token_ptr->seq		 = Flip_int32( token_ptr->seq );
736 	token_ptr->proc_id	 = Flip_int32( token_ptr->proc_id );
737 	token_ptr->aru		 = Flip_int32( token_ptr->aru );
738 	token_ptr->aru_last_id	 = Flip_int32( token_ptr->aru_last_id );
739 	token_ptr->flow_control	 = Flip_int16( token_ptr->flow_control );
740 	token_ptr->rtr_len	 = Flip_int16( token_ptr->rtr_len );
741 }
742