1 /*
2  * The Spread Toolkit.
3  *
4  * The contents of this file are subject to the Spread Open-Source
5  * License, Version 1.0 (the ``License''); you may not use
6  * this file except in compliance with the License.  You may obtain a
7  * copy of the License at:
8  *
9  * http://www.spread.org/license/
10  *
11  * or in the file ``license.txt'' found in this distribution.
12  *
13  * Software distributed under the License is distributed on an AS IS basis,
14  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
15  * for the specific language governing rights and limitations under the
16  * License.
17  *
18  * The Creators of Spread are:
19  *  Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
20  *
21  *  Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
22  *
23  *  All Rights Reserved.
24  *
25  * Major Contributor(s):
26  * ---------------
27  *    Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
28  *    Theo Schlossnagle    jesus@omniti.com - Perl, skiplists, autoconf.
29  *    Dan Schoenblum       dansch@cnds.jhu.edu - Java interface.
30  *    John Schultz         jschultz@cnds.jhu.edu - contribution to process group membership.
31  *
32  */
33 
34 
35 #include "arch.h"
36 
37 #include <string.h>
38 #include <stdlib.h>
39 #include <stdio.h>
40 #include <assert.h>
41 
42 #ifndef ARCH_PC_WIN95
43 
44 #include <errno.h>
45 #include <unistd.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #include <sys/stat.h>
49 
50 #ifdef HAVE_SYS_UIO_H
51 #include <sys/uio.h>
52 #endif
53 
54 #include <netinet/in.h>
55 #include <netinet/tcp.h>
56 #include <sys/un.h>
57 #include <signal.h>
58 #include <sys/ioctl.h>
59 
60 #else   /* ARCH_PC_WIN95 */
61 
62 #include <winsock.h>
63 #define	ioctl 	ioctlsocket
64 
65 #endif  /* ARCH_PC_WIN95 */
66 
67 #include "spread_params.h"
68 #include "net_types.h"
69 #include "sp_events.h"
70 #include "objects.h"
71 #include "memory.h"
72 #include "session.h"
73 #include "sess_types.h"
74 
75 #define ext_sess_body
76 #include "sess_body.h"
77 #undef  ext_sess_body
78 
79 #include "groups.h"
80 #include "log.h"
81 #include "status.h"
82 #include "alarm.h"
83 #include "objects.h"
84 #include "prot_body.h"
85 #if     ( SPREAD_PROTOCOL > 3 )
86 #include "queues.h"
87 #endif
88 #include "message.h"
89 #include "acm.h"
90 
91 static	sp_time		Badger_timeout = { 0, 100000 };
92 
93 static	message_obj	New_mess;
94 
95 static  int             Accept_inet_mbox_num;
96 static	mailbox		Accept_inet_mbox[MAX_INTERFACES_PROC];
97 static	mailbox		Accept_unix_mbox;
98 
99 static	int		Protocol_threshold;
100 
101 #define SESSION_FD_HASH_SIZE    256
102 static	session		*Sessions_hash_head[SESSION_FD_HASH_SIZE];
103 static	session		*Sessions_head;
104 static	session		*Sessions_tail;
105 static	session		*Sessions_free;
106 
107 static	void	Sess_attach_accept(void);
108 static	void	Sess_detach_accept(void);
109 static  void    Sess_recv_client_auth(mailbox mbox, int dummy, void *dummy_p);
110 static	void    Sess_accept( mailbox mbox, int domain, void *dummy );
111 static	void	Sess_accept_continue( mailbox, int, void * );
112 static	void    Sess_read( mailbox mbox, int domain, void *dummy );
113 static	void	Sess_badger( mailbox mbox, void *dummy );
114 static	void	Sess_kill( mailbox mbox );
115 static	void	Sess_handle_join( message_link *mess_link );
116 static	void	Sess_handle_leave( message_link *mess_link );
117 static	void	Sess_handle_kill( message_link *mess_link );
118 static  void    Sess_deliver_reject( message_obj *msg );
119 static  void    Sess_create_reject_message ( message_obj *msg );
120 static  int     Sess_get_p2p_dests( int num_groups, char groups[][MAX_GROUP_NAME], char dests[][MAX_GROUP_NAME] );
121 
122 #define ACTIVATE_PORT_REUSE(mbox) do { \
123     int on = 1; \
124     if (setsockopt(mbox, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof(on)) < 0) \
125         Alarm( EXIT, "Sess_init: Error setting SO_REUSEADDR socket option\n" ); \
126 } while (0)
127 
Sess_get_session_index(int mbox)128 int	Sess_get_session_index (int mbox)
129 {
130     session *tmp;
131     unsigned char *c = (unsigned char *) &mbox;
132     unsigned int i;
133 
134     i = c[0] ^ c[1] ^ c[2] ^ c[3];
135 
136     Alarm( NONE, "Sess_get_session_index: mbox %d hashed to %u\n", mbox, i);
137     for (tmp = Sessions_hash_head[i]; tmp; tmp = tmp->hash_next)
138         if (tmp->mbox == mbox)
139             return (tmp - Sessions);
140 
141     return -1;
142 }
143 
Sess_hash_session(session * ses)144 static  void    Sess_hash_session (session *ses)
145 {
146     unsigned int i;
147     unsigned char *c;
148 
149     c = (unsigned char *) &ses->mbox;
150     i = c[0] ^ c[1] ^ c[2] ^ c[3];
151     ses->hash_next = Sessions_hash_head[i];
152     Sessions_hash_head[i] = ses;
153 }
154 
Sess_unhash_session(session * ses)155 static  void    Sess_unhash_session (session *ses)
156 {
157     unsigned int i;
158     unsigned char *c;
159     session *tmp;
160 
161     c = (char *) &ses->mbox;
162     i = c[0] ^ c[1] ^ c[2] ^ c[3];
163     tmp = Sessions_hash_head[i];
164     if (tmp == ses)
165     {
166         Sessions_hash_head[i] = ses->hash_next;
167         ses->hash_next = NULL;
168         return;
169     }
170 
171     for ( ; tmp->hash_next != ses; tmp = tmp->hash_next);
172     tmp->hash_next = ses->hash_next;
173     ses->hash_next = NULL;
174 }
175 
Sess_get_free_session(void)176 static	session	*Sess_get_free_session (void)
177 {
178     session *ses;
179 
180     if ((ses = Sessions_free) == NULL)
181     {
182         Alarm (EXIT, "Sess_get_free_session: BUG ! No free sessions !\n");
183     }
184 
185     Sessions_free = Sessions_free->sort_next;
186 
187     return ses;
188 }
189 
Sess_free_session(session * ses)190 static	void	Sess_free_session (session *ses)
191 {
192     ses->sort_next = Sessions_free;
193     Sessions_free = ses;
194 }
195 
Sess_insert_new_session(session * where,session * template)196 static	int	Sess_insert_new_session (session *where, session *template)
197 {
198     session *new_ses;
199 
200     new_ses = Sess_get_free_session();
201     memmove(new_ses, template, sizeof (*template));
202 
203     if (!where)
204     {
205         /* Ok, we insert a session at the end of the list... */
206         new_ses->sort_next = NULL;
207 
208         if (!Sessions_tail)
209         {
210             /* List is empty */
211             new_ses->sort_prev = NULL;
212             Sessions_head = Sessions_tail = new_ses;
213         }
214         else
215         {
216             new_ses->sort_prev = Sessions_tail;
217             Sessions_tail->sort_next = new_ses;
218             Sessions_tail = new_ses;
219         }
220     }
221     else
222     {
223         /* Ok, we insert a session in the middle of the list, just
224          * before where... */
225         new_ses->sort_next = where;
226         new_ses->sort_prev = where->sort_prev;
227         where->sort_prev = new_ses;
228 
229         if (!new_ses->sort_prev)
230         {
231             /* new_ses is new head */
232             Sessions_head = new_ses;
233         }
234         else
235         {
236             new_ses->sort_prev->sort_next = new_ses;
237         }
238     }
239 
240     return(new_ses - Sessions);
241 }
242 
Sess_remove_session(session * ses)243 static	void	Sess_remove_session (session *ses)
244 {
245     if (!ses->sort_prev && !ses->sort_next)
246     {
247         /* Last session */
248         Sessions_head = Sessions_tail = NULL;
249         Sess_free_session(ses);
250 
251         return;
252     }
253 
254     if (!ses->sort_prev)
255     {
256         /* Head */
257         Sessions_head = ses->sort_next;
258         ses->sort_next->sort_prev = NULL;
259         Sess_free_session(ses);
260 
261         return;
262     }
263 
264     if (!ses->sort_next)
265     {
266         /* Tail */
267         Sessions_tail = ses->sort_prev;
268         ses->sort_prev->sort_next = NULL;
269         Sess_free_session(ses);
270 
271         return;
272     }
273 
274     /* All troubled cases are above ;-) */
275     ses->sort_next->sort_prev = ses->sort_prev;
276     ses->sort_prev->sort_next = ses->sort_next;
277     Sess_free_session(ses);
278 }
279 
Sess_init_sessions(void)280 static	void	Sess_init_sessions (void)
281 {
282     int i;
283 
284     for (i = 0; i < SESSION_FD_HASH_SIZE; i++)
285         Sessions_hash_head[i] = NULL;
286 
287     Sessions_free = Sessions_head = Sessions_tail = NULL;
288 
289     for (i = 0; i < MAX_SESSIONS; i++)
290         Sess_free_session( &Sessions[i] );
291 }
292 
count_bits_set(int32u field,int first_index,int last_index)293 int     count_bits_set( int32u field, int first_index, int last_index)
294 {
295         int i;
296         int count = 0;
297         int32u bitfield;
298         assert(first_index >= 0 && first_index < 32);
299         assert(last_index >= 0 && last_index <=32);
300         assert(last_index >= first_index);
301 
302         bitfield = 0x1 << first_index;
303         for ( i=0; i < (last_index - first_index); i++, bitfield <<=1)
304         {
305                 if (field & bitfield )
306                         count++;
307         }
308         return(count);
309 }
310 
Sess_init()311 void	Sess_init()
312 {
313 	struct	sockaddr_in	inet_addr;
314 	int16			port;
315 	int			ret, i;
316 	mailbox			mbox;
317 
318 #ifndef ARCH_PC_WIN95
319 
320 	struct	sockaddr_un	unix_addr;
321 	char			name[80];
322 
323 	signal( SIGPIPE, SIG_IGN );
324 
325 #endif	/* ARCH_PC_WIN95 */
326 
327         ret = Mem_init_object( MESSAGE_LINK, sizeof(message_link), 1000, 0);
328         if (ret < 0)
329         {
330                 Alarm(EXIT, "Sess_init: Failure to Initialize MESSAGE_LINK memory objects\n");
331         }
332 
333         ret = Mem_init_object( DOWN_LINK, sizeof(down_link), 200, 0);
334         if (ret < 0)
335         {
336                 Alarm(EXIT, "Sess_Init: Failure to Initialize DOWN_LINK memory objects\n");
337         }
338 
339 	Sess_init_sessions ();
340 
341 	Num_sessions = 0;
342 	GlobalStatus.num_sessions = Num_sessions;
343 	GlobalStatus.message_delivered = 0;
344 	My 	 = Conf_my();
345 	port 	 = My.port;
346 	Session_threshold = LOW_PRIORITY;
347 
348 	/* Initializing the protocol */
349 	Protocol_threshold = LOW_PRIORITY;
350 
351         Prot_init_down_queues();
352         Prot_set_down_queue( NORMAL_DOWNQUEUE );
353         Prot_init();
354 
355 	/* Initiation of the INET socket */
356         memset(&inet_addr.sin_zero, 0, sizeof(inet_addr.sin_zero));
357 
358 	inet_addr.sin_family	= AF_INET;
359 	inet_addr.sin_port	= htons(port);
360         Accept_inet_mbox_num = 0;
361 
362         /* Bind to all interfaces specified in config file */
363         for ( i=0; i < My.num_if; i++)
364         {
365                 if (Is_IfType_Client(My.ifc[i].type) || Is_IfType_Any(My.ifc[i].type) )
366                 {
367                         port_reuse type;
368                         if( (mbox = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1)
369                                 Alarm( EXIT, "Sess_init: INET sock error\n" );
370                         type = Conf_get_port_reuse_type();
371                         if (type == port_reuse_on)
372                                 ACTIVATE_PORT_REUSE(mbox);
373 
374                         if (Is_IfType_Any(My.ifc[i].type) )
375                                 inet_addr.sin_addr.s_addr = INADDR_ANY;
376                         else
377                         {
378                                 if (type == port_reuse_auto)
379                                         ACTIVATE_PORT_REUSE(mbox);
380                                 inet_addr.sin_addr.s_addr = htonl(My.ifc[i].ip);
381                         }
382                         if( bind( mbox,  (struct sockaddr *)&inet_addr, sizeof(inet_addr) ) == -1)
383                         {
384                                 Alarm( PRINT, "Sess_init: INET unable to bind to port %d, already running \n" ,port );
385                                 exit(0);
386                         }
387                         inet_addr.sin_addr.s_addr = ntohl(inet_addr.sin_addr.s_addr);
388                         Alarm( SESSION, "Sess_init: INET bind for port %d interface %d.%d.%d.%d ok\n", port,
389                                IP1(inet_addr.sin_addr.s_addr), IP2(inet_addr.sin_addr.s_addr),
390                                IP3(inet_addr.sin_addr.s_addr), IP4(inet_addr.sin_addr.s_addr) );
391 
392                         if( listen( mbox, 25 ) < 0 )
393                                 Alarm( EXIT, "Sess_init: INET unable to listen\n" );
394 
395                         Accept_inet_mbox[Accept_inet_mbox_num] = mbox;
396                         Accept_inet_mbox_num++;
397                         Alarm( SESSION, "Sess_init: INET went ok on mailbox %d\n", mbox );
398                 }
399         }
400 
401 #ifndef ARCH_PC_WIN95
402 
403 	/* Initiation of the UNIX socket */
404 
405 	if( (mbox = socket( AF_UNIX, SOCK_STREAM, 0 ) ) == -1)
406 	    Alarm( EXIT, "Sess_init: UNIX sock error\n" );
407 
408 	unix_addr.sun_family	= AF_UNIX;
409 	sprintf( name, "%s/spread.sock", _PATH_SPREAD_PIDDIR );
410 	strcpy( unix_addr.sun_path, name );
411 	unlink( name );
412 
413 	if( bind( mbox, (struct sockaddr *)&unix_addr, sizeof(unix_addr) ) == -1)
414 	{
415 		Alarm( PRINT, "Sess_init: UNIX unable to bind to name %s, already running \n" , name );
416 		exit(0);
417 	}
418 	Alarm( SESSION, "Sess_init: UNIX bind for name %s ok\n", name );
419 
420 	chmod( name, 0666 );
421 
422 	if( listen( mbox, 5 ) < 0 )
423 	    Alarm( EXIT, "Sess_init: UNIX unable to listen\n" );
424 
425 	Accept_unix_mbox = mbox;
426         Alarm( SESSION, "Sess_init: UNIX went ok on mailbox %d\n", mbox );
427 
428 #endif	/* ARCH_PC_WIN95 */
429 
430 	Sess_attach_accept();
431 
432         Message_populate_with_buffers(&New_mess);
433 
434 	G_init();
435 
436         Alarm( SESSION, "Sess_init: ended ok\n" );
437 }
438 
Sess_set_active_threshold()439 void	Sess_set_active_threshold()
440 {
441 	/* This function is used only by the session (and groups) layer */
442 
443 	if( Protocol_threshold > Session_threshold )
444 		E_set_active_threshold( Protocol_threshold );
445 	else 	E_set_active_threshold( Session_threshold );
446 }
447 
Sess_block_user(int xxx)448 void    Sess_block_user(int xxx)
449 {
450 
451         Alarm(EXIT,"Sess_block_user: NOT IMPLEMENTED!\n");
452 }
453 
Sess_unblock_user(int xxx)454 void    Sess_unblock_user(int xxx)
455 {
456 
457         Alarm(EXIT,"Sess_unblock_user: NOT IMPLEMENTED!\n");
458 }
Sess_block_users_level()459 void	Sess_block_users_level()
460 {
461 	/* This function is used only by lower layers (protocol) */
462 	if( Protocol_threshold < MEDIUM_PRIORITY )
463 	{
464 		Protocol_threshold = MEDIUM_PRIORITY;
465 		Sess_set_active_threshold();
466 	}
467 }
468 
Sess_unblock_users_level()469 void	Sess_unblock_users_level()
470 {
471 	/* This function is used only by lower layers (protocol) */
472 	if( Protocol_threshold > LOW_PRIORITY )
473 	{
474 		Protocol_threshold = LOW_PRIORITY;
475 		Sess_set_active_threshold();
476 	}
477 }
478 
Sess_attach_accept()479 static	void	Sess_attach_accept()
480 {
481         int i;
482         for ( i=0; i < Accept_inet_mbox_num; i++)
483         {
484                 E_attach_fd( Accept_inet_mbox[i], READ_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
485                 E_attach_fd( Accept_inet_mbox[i], EXCEPT_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
486         }
487 #ifndef ARCH_PC_WIN95
488 
489 	E_attach_fd( Accept_unix_mbox, READ_FD, Sess_accept, AF_UNIX, NULL, LOW_PRIORITY );
490 
491 #endif	/* ARCH_PC_WIN95 */
492 
493 }
494 
Sess_detach_accept()495 static	void	Sess_detach_accept()
496 {
497         int i;
498         for (i=0; i < Accept_inet_mbox_num; i++)
499         {
500                 E_detach_fd( Accept_inet_mbox[i], READ_FD );
501                 E_detach_fd( Accept_inet_mbox[i], EXCEPT_FD );
502         }
503 #ifndef ARCH_PC_WIN95
504 
505 	E_detach_fd( Accept_unix_mbox, READ_FD );
506 
507 #endif	/* ARCH_PC_WIN95 */
508 
509 }
510 
Sess_accept_continue2(int d1,void * d2)511 void	Sess_accept_continue2(int d1, void *d2)
512 {
513 	Sess_accept_continue(0,0,NULL);
514 }
515 
516 
Sess_accept(mailbox mbox,int domain,void * dummy)517 static	void	Sess_accept( mailbox mbox, int domain, void *dummy )
518 {
519 	struct	sockaddr_in	inet_addr;
520 	socklen_t		inet_len;
521         sockopt_len_t           onlen;
522 	sp_time			accept_delay;
523 	char			response;
524 	int			ret;
525 	int			i;
526 
527 	int32			on;
528 
529 	if( domain == AF_INET )
530 	{
531 		inet_len = sizeof(inet_addr);
532 		Sessions[MAX_SESSIONS].mbox = accept( mbox, (struct sockaddr *)&inet_addr, &inet_len );
533 
534 		Sessions[MAX_SESSIONS].type = AF_INET;
535 		/*
536 		 * sender's machine ip address is: htonl(inet_addr.sin_addr.s_addr)
537 		 * sender's assigned port is     : htons(inet_addr.sin_port)
538 		 */
539 		Sessions[MAX_SESSIONS].address = htonl(inet_addr.sin_addr.s_addr);
540 	}else if( domain == AF_UNIX ){
541 		/* no need for return values for AF_UNIX on the accept */
542 		Sessions[MAX_SESSIONS].mbox = accept( mbox, 0, 0 );
543 
544 		Sessions[MAX_SESSIONS].type = AF_UNIX;
545 		Sessions[MAX_SESSIONS].address = 0;
546 	}else Alarm( EXIT, "Sess_accept: Unknown domain %d on mailbox %d\n", domain, mbox );
547 
548 	if( Sessions[MAX_SESSIONS].mbox < 0 )
549 	{
550 		Alarm( SESSION, "Sess_accept: accept failed for domain %d\n", domain );
551 		return;
552 	}
553 	if( Num_sessions == MAX_SESSIONS )
554 	{
555 		response = REJECT_QUOTA;
556 		send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
557 		close( Sessions[MAX_SESSIONS].mbox );
558 		Alarm( SESSION, "Sess_accept: rejecting session due to quota\n" );
559 		return;
560 	}
561 
562         if ( ( (i = Sess_get_session_index(Sessions[MAX_SESSIONS].mbox)) != -1)
563              && (Sessions[i].mbox == Sessions[MAX_SESSIONS].mbox)
564              && (Is_op_session( Sessions[i].status )) )
565         {
566             /* This is impossible as the mbox must have been closed to be returned by accept */
567             Alarm(EXIT, "Sess_accept: BUG! Accepted new FD %d that is currently in use(ses %d).\n", Sessions[i].mbox, i);
568         }
569 	for( i=10; i <= 200; i+=5 )
570 	{
571 	    on = 1024*i;
572 	    onlen = sizeof(on);
573 
574  	    ret = setsockopt( Sessions[MAX_SESSIONS].mbox, SOL_SOCKET, SO_SNDBUF, (void *)&on, onlen);
575 	    if (ret < 0 ) break;
576 
577 	    ret = setsockopt( Sessions[MAX_SESSIONS].mbox, SOL_SOCKET, SO_RCVBUF, (void *)&on, onlen);
578 	    if (ret < 0 ) break;
579 
580 	    ret= getsockopt( Sessions[MAX_SESSIONS].mbox, SOL_SOCKET, SO_SNDBUF, (void *)&on, &onlen );
581 	    if( on < i*1024 ) break;
582 	    Alarm( NONE, "Sess_accept: set sndbuf %d, ret is %d\n", on, ret );
583 
584 	    onlen = sizeof(on);
585 	    ret= getsockopt( Sessions[MAX_SESSIONS].mbox, SOL_SOCKET, SO_RCVBUF, (void *)&on, &onlen );
586 	    if( on < i*1024 ) break;
587 	    Alarm( NONE, "Sess_accept: set rcvbuf %d, ret is %d\n", on, ret );
588 	}
589 	Alarm( SESSION, "Sess_accept: set sndbuf/rcvbuf to %d\n", 1024*(i-5) );
590 
591         if ( domain == AF_INET ) {
592                 on = 1;
593                 ret = setsockopt( Sessions[MAX_SESSIONS].mbox, IPPROTO_TCP, TCP_NODELAY, (void *)&on, 4);
594                 if (ret < 0)
595                         Alarm(PRINT, "Setting TCP_NODELAY failed with  error: %s\n", sock_strerror(sock_errno));
596                 else
597                         Alarm( SESSION, "Setting TCP_NODELAY on socket %d\n", Sessions[MAX_SESSIONS].mbox );
598         }
599 	/* delaying for the private name to be written */
600 	Sess_detach_accept();
601 	E_attach_fd( Sessions[MAX_SESSIONS].mbox, READ_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );
602 	E_attach_fd( Sessions[MAX_SESSIONS].mbox, EXCEPT_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );
603 	accept_delay.sec = 1;
604 	accept_delay.usec= 0;
605 	E_queue( Sess_accept_continue2, 1, NULL, accept_delay );
606 }
607 
Sess_accept_continue(mailbox d1,int d2,void * d3)608 void	Sess_accept_continue(mailbox d1, int d2, void *d3)
609 {
610 	char			response;
611 	int			legal_private_name;
612 	int			unique_private_name;
613 	int			name_len;
614 	int			ioctl_cmd;
615         char                    version[3];
616 	char			conn[2];
617         char                    priv_user_name[MAX_PRIVATE_NAME];
618 	int			ret, i, sess_location, rnum;
619         char                    *allowed_auth_list;
620         unsigned char           list_len;
621 	session			*tmp_ses;
622 
623 	E_dequeue( Sess_accept_continue2, 1, NULL );
624 	E_detach_fd( Sessions[MAX_SESSIONS].mbox, READ_FD );
625 	E_detach_fd( Sessions[MAX_SESSIONS].mbox, EXCEPT_FD );
626 	Sess_attach_accept();
627 
628 	/* set file descriptor to non blocking */
629 	ioctl_cmd = 1;
630 	ret = ioctl( Sessions[MAX_SESSIONS].mbox, FIONBIO, &ioctl_cmd);
631 
632 	/*
633 	 * connect message looks like:
634 	 *
635 	 * byte - version of lib
636 	 * byte - subversion of lib
637          * (optional) byte - patchversion of lib (only if version.subversion > 3.14)
638 	 * byte - 1/0 with or without groups
639 	 * byte - len of name
640 	 * len bytes - name
641 	 *
642 	 */
643 	/* version checking  3.01 is minimal */
644 
645         version[0] = version[1] = version[2] = 0;
646 
647 	ret = recv( Sessions[MAX_SESSIONS].mbox, version, 2, 0 );
648 	if( ret < 0 )
649 	{
650 		Alarm( SESSION, "Sess_accept_continue: reading version.subversion failed on mailbox %d\n",
651 			Sessions[MAX_SESSIONS].mbox );
652 		close( Sessions[MAX_SESSIONS].mbox );
653 		return;
654         }
655         if ( version[0]*10000 + version[1]*100 + version[2] > 31400 )
656         {
657             ret = recv( Sessions[MAX_SESSIONS].mbox, &version[2], 1, 0 );
658             if( ret < 0 )
659             {
660 		Alarm( SESSION, "Sess_accept_continue: reading patch_version failed on mailbox %d\n",
661 			Sessions[MAX_SESSIONS].mbox );
662 		close( Sessions[MAX_SESSIONS].mbox );
663 		return;
664             }
665         }
666 	if( version[0]*10000 + version[1]*100 + version[2] < 30100 )
667 	{
668 		response = REJECT_VERSION;
669 		send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
670 		Alarm( SESSION, "Sess_accept_continue: version %d.%d.%d is not supported\n",
671 			version[0], version[1], version[2] );
672 		close( Sessions[MAX_SESSIONS].mbox );
673 		return;
674 	}
675 
676         Sessions[MAX_SESSIONS].lib_version[0] = version[0];
677         Sessions[MAX_SESSIONS].lib_version[1] = version[1];
678         Sessions[MAX_SESSIONS].lib_version[2] = version[2];
679 
680 	ret = recv( Sessions[MAX_SESSIONS].mbox, conn, 2, 0 );
681 	if( ret < 0 )
682 	{
683 		Alarm( SESSION, "Sess_accept_continue: reading private name failed on mailbox %d\n",
684 			Sessions[MAX_SESSIONS].mbox );
685 		close( Sessions[MAX_SESSIONS].mbox );
686 		return;
687 
688 	}else if( ret < 2 ){
689 		response = REJECT_NO_NAME;
690 		send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
691 		Alarm( SESSION, "Sess_accept_continue: reading private name failed on mailbox %d\n",
692 			Sessions[MAX_SESSIONS].mbox );
693 		close( Sessions[MAX_SESSIONS].mbox );
694 		return;
695 	}
696 
697         if( ((int)conn[0] % 2) ==  1 ) Sessions[MAX_SESSIONS].status = Set_memb_session( Sessions[MAX_SESSIONS].status );
698         else Sessions[MAX_SESSIONS].status = Clear_memb_session( Sessions[MAX_SESSIONS].status );
699         Sessions[MAX_SESSIONS].priority = (int)conn[0] / 16 ;
700 
701 	name_len = (int)conn[1];
702 	if( name_len > MAX_PRIVATE_NAME || name_len < 0 )
703 	{
704 		response = REJECT_ILLEGAL_NAME;
705 		send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
706 		Alarm( SESSION, "Sess_accept_continue: len %d of private name does not fit (ret = %d) on mailbox %d\n",
707 			name_len, ret,
708 			Sessions[MAX_SESSIONS].mbox );
709 		close( Sessions[MAX_SESSIONS].mbox );
710 		return;
711 	}
712 
713 	/* reading private name */
714 	for( i=0; i < MAX_PRIVATE_NAME+1; i++ )
715 		Sessions[MAX_SESSIONS].name[i] = 0;
716 
717         if (name_len == 0)
718         {
719                 /* Assign a random user name to this user. Currently this is a random 4 digit number followed
720                  * by the mbox of the users connection.
721                  */
722                 char newname[MAX_PRIVATE_NAME];
723                 unique_private_name = 0;
724                 while ( !unique_private_name )
725                 {
726                         memset(newname, '\0', MAX_PRIVATE_NAME);
727                         rnum = (int) (9999.0*get_rand()/(RAND_MAX+1.0));
728                         snprintf(newname, MAX_PRIVATE_NAME, "r%u-%u", rnum, Sessions[MAX_SESSIONS].mbox);
729                         memcpy( Sessions[MAX_SESSIONS].name, newname, MAX_PRIVATE_NAME);
730 
731                         /* checking if private name is unique */
732                         for( unique_private_name=1, tmp_ses = Sessions_head; tmp_ses; tmp_ses = tmp_ses->sort_next )
733                         {
734                                 ret = strcmp( Sessions[MAX_SESSIONS].name, tmp_ses->name );
735                                 if( ret <= 0 )
736                                 {
737                                         if( ret == 0 ) unique_private_name = 0;
738                                         break;
739                                 }
740                         }
741                 }
742         } else {
743                 /* recive user name from client and validate it */
744                 ret = recv( Sessions[MAX_SESSIONS].mbox, priv_user_name, MAX_PRIVATE_NAME, 0 );
745                 if( ret < 0 )
746                 {
747                         Alarm( SESSION, "Sess_accept_continue: reading private name failed on mailbox %d\n",
748                                Sessions[MAX_SESSIONS].mbox );
749                         close( Sessions[MAX_SESSIONS].mbox );
750                         return;
751                 }else if( ret != name_len )
752                 {
753                         response = REJECT_ILLEGAL_NAME;
754                         send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
755                         Alarm( SESSION, "Sess_accept_continue: len %d of private name does not fit (ret = %d) on mailbox %d\n",
756                                name_len, ret, Sessions[MAX_SESSIONS].mbox );
757                         close( Sessions[MAX_SESSIONS].mbox );
758                         return;
759                 }
760                 memcpy( Sessions[MAX_SESSIONS].name, priv_user_name, name_len );
761 
762                 /* checking if private name is legal */
763                 for( legal_private_name=1, i=0; i < name_len; i++ )
764                         if( Sessions[MAX_SESSIONS].name[i] <= '#' ||
765                             Sessions[MAX_SESSIONS].name[i] >  '~' ) legal_private_name = 0;
766                 if( !legal_private_name )
767                 {
768                         response = REJECT_ILLEGAL_NAME;
769                         send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
770                         Alarm( SESSION, "Sess_accept_continue: illegal private name %s on mailbox %d\n",
771                                Sessions[MAX_SESSIONS].name,
772                                Sessions[MAX_SESSIONS].mbox );
773                         close( Sessions[MAX_SESSIONS].mbox );
774                         return;
775                 }
776 
777                 /* checking if private name is unique */
778 		for( unique_private_name=1, tmp_ses = Sessions_head; tmp_ses; tmp_ses = tmp_ses->sort_next )
779                 {
780                         ret = strcmp( Sessions[MAX_SESSIONS].name, tmp_ses->name );
781                         if( ret <= 0 )
782                         {
783                                 if( ret == 0 ) unique_private_name = 0;
784                                 break;
785                         }
786                 }
787                 if( !unique_private_name )
788                 {
789                         response = REJECT_NOT_UNIQUE;
790                         send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
791                         Alarm( SESSION, "Sess_accept_continue: non unique private name %s on mailbox %d\n",
792                                Sessions[MAX_SESSIONS].name,
793                                Sessions[MAX_SESSIONS].mbox );
794                         close( Sessions[MAX_SESSIONS].mbox );
795                         return;
796                 }
797         }
798 
799 	/* set file descriptor back to blocking */
800 	ioctl_cmd = 0;
801 	ret = ioctl( Sessions[MAX_SESSIONS].mbox, FIONBIO, &ioctl_cmd);
802 
803 	/* Insert the new session just before the point we already
804 	 * found while checking unique private name... */
805 	sess_location = Sess_insert_new_session (tmp_ses, &Sessions[MAX_SESSIONS]);
806 
807 	Num_sessions++;
808 	GlobalStatus.num_sessions = Num_sessions;
809 
810         Sessions[sess_location].status = Set_preauth_session( Sessions[sess_location].status );
811 
812         Sess_hash_session (&Sessions[sess_location]);
813 
814         /* OLD client library without authentication/authorization code */
815         if ( version[0]*10000 + version[1]*100 + version[2] < 31600 )
816         {
817                 Acm_acp_fill_ops( &(Sessions[sess_location].acp_ops) );
818 
819                 /* If IP access control is enabled, then check it.
820                  */
821                 if ( Acm_auth_query_allowed("IP") )
822                 {
823                         void (*auth_open)(struct session_auth_info *);
824                         struct session_auth_info *sess_auth_p;
825 
826                         sess_auth_p = Acm_auth_create_sess_info_forIP(Sessions[sess_location].mbox);
827                         auth_open = Acm_auth_get_auth_client_connection_byname("IP");
828                         auth_open(sess_auth_p);
829                         return;
830                 }
831                 /* If no IP authentication enabled, then try to use NULL */
832                 if ( Acm_auth_query_allowed("NULL") )
833                         Sess_session_authorized(sess_location);
834                 else
835                         Sess_session_denied(sess_location);
836                 return;
837         }
838 
839         allowed_auth_list = Acm_auth_get_allowed_list();
840         list_len = strlen(allowed_auth_list);
841 
842         send( Sessions[sess_location].mbox, &list_len, 1, 0 );
843         send( Sessions[sess_location].mbox, allowed_auth_list, list_len, 0 );
844 
845         /* If no AllowedAuthMethods are declared, reject all sessions.
846          * To maintain old behaivor of allowing all, just add the NULL
847          * method to the allowed list and then all clients will be allowed.
848          */
849         if (list_len == 0)
850         {
851                 Sess_session_denied(sess_location);
852                 return;
853         }
854 
855         /* Now wait for client reply */
856 	E_attach_fd( Sessions[sess_location].mbox, READ_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
857 	E_attach_fd( Sessions[sess_location].mbox, EXCEPT_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
858 }
859 
Sess_recv_client_auth(mailbox mbox,int dummy,void * dummy_p)860 static void    Sess_recv_client_auth(mailbox mbox, int dummy, void *dummy_p)
861 {
862         int         ret, i, ioctl_cmd, ses;
863         char        auth_name[MAX_AUTH_NAME * MAX_AUTH_METHODS];
864         void        (*auth_open)(struct session_auth_info *);
865         struct session_auth_info *sess_auth_p;
866 
867         ses = Sess_get_session_index(mbox);
868 	if( ses < 0 || ses >= MAX_SESSIONS ) {
869             Alarm( PRINT, "Sess_recv_client_auth: Illegal mbox %d for receiving client auth. Cannot deny or allow\n", mbox);
870             return;
871         }
872         if (!Is_preauth_session(Sessions[ses].status) )
873         {
874                 Alarm( EXIT, "Sess_recv_client_auth: BUG! Session is already authorized (status 0x%x)\n", Sessions[ses].status);
875         }
876 
877         E_detach_fd(mbox, READ_FD);
878         E_detach_fd(mbox, EXCEPT_FD);
879 
880 	/* set file descriptor to non blocking */
881 	ioctl_cmd = 1;
882 	ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
883 
884         /* FIXME: Support partial reads by storing the portion read so far and requeuing */
885         ret = recv( mbox, auth_name, MAX_AUTH_NAME * MAX_AUTH_METHODS, 0 );
886         if( ret < 0 )
887         {
888                 Alarm( SESSION, "Sess_recv_client_auth: reading auth string failed on mailbox %d\n", mbox );
889                 Sess_session_denied(ses);
890                 return;
891         }
892         if( ret < (MAX_AUTH_NAME * MAX_AUTH_METHODS) )
893         {
894                 Alarm( SESSION, "Sess_recv_client_auth: reading auth string SHORT on mailbox %d\n", mbox );
895                 Sess_session_denied(ses);
896                 return;
897         }
898 
899 	/* set file descriptor back to blocking */
900 	ioctl_cmd = 0;
901 	ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
902 
903         i = 0;
904         while ( auth_name[i * MAX_AUTH_NAME] != '\0')
905         {
906                 Alarm( SESSION, "Sess_recv_client_auth: Client requested %s type authentication\n", &auth_name[i * MAX_AUTH_NAME]);
907                 if ( !Acm_auth_query_allowed(&auth_name[i * MAX_AUTH_NAME]) )
908                 {
909                         Alarm( SESSION, "Sess_recv_client_auth: received non-allowed auth method %s, closing session %d on mailbox %d\n", &auth_name[i * MAX_AUTH_NAME], ses, mbox);
910                         Sess_session_denied(ses);
911                         return;
912                 }
913                 i++;
914         }
915         /* Register default permit all ops access control policy. */
916         Acm_acp_fill_ops( &(Sessions[ses].acp_ops) );
917 
918         sess_auth_p = Acm_auth_create_sess_info(mbox, auth_name);
919         if ( NULL == sess_auth_p )
920         {
921                 Alarm( SESSION, "Sess_recv_client_auth: no valid auth_methods set or received: auth method %s, closing session %d on mailbox %d\n", auth_name, ses, mbox);
922                 Sess_session_denied(ses);
923                 return;
924         }
925         auth_open = Acm_auth_get_auth_client_connection( sess_auth_p->required_auth_methods[0] );
926         if (auth_open == NULL)
927         {
928                 Alarm(PRINT, "Sess_recv_client_auth: Illegal auth_method_id (%d) tried\n", sess_auth_p->required_auth_methods[0]);
929                 dispose( sess_auth_p );
930                 Sess_session_denied( ses );
931                 return;
932         }
933         auth_open( sess_auth_p );
934 }
935 
Sess_session_report_auth_result(struct session_auth_info * sess_auth_h,int authenticated_p)936 void    Sess_session_report_auth_result(struct session_auth_info *sess_auth_h, int authenticated_p )
937 {
938         int ses, authid, num_auths;
939         int permit_count, decision, i;
940         void        (*auth_open)(struct session_auth_info *);
941 
942         ses = Sess_get_session_index(sess_auth_h->mbox);
943 	if( ses < 0 || ses >= MAX_SESSIONS ) {
944             Alarm( PRINT, "Sess_session_report_auth_result: Illegal mbox %d for authentication. Cannot deny or allow\n", sess_auth_h->mbox);
945             dispose( sess_auth_h );
946             return;
947         }
948         num_auths = sess_auth_h->num_required_auths;
949         /* finished another method. See if entire set of checks is complete */
950         sess_auth_h->required_auth_results[sess_auth_h->completed_required_auths] = authenticated_p;
951         sess_auth_h->completed_required_auths++;
952         if (sess_auth_h->completed_required_auths < num_auths)
953         {
954                 authid = sess_auth_h->required_auth_methods[sess_auth_h->completed_required_auths];
955                 auth_open = Acm_auth_get_auth_client_connection(authid);
956                 if (auth_open == NULL)
957                 {
958                         Alarm(PRINT, "Sess_session_report_auth_result: Illegal auth_method_id (%d) tried\n", authid);
959                         dispose( sess_auth_h );
960                         Sess_session_denied( ses );
961                         return;
962                 }
963                 auth_open(sess_auth_h);
964                 return;
965         }
966         permit_count = 0;
967         for (i = 0; i < num_auths; i++)
968                 if ( sess_auth_h->required_auth_results[i] ) permit_count++;
969 
970         dispose( sess_auth_h );
971 
972         if ( permit_count < num_auths )
973         {
974                 /* session is denied if any authentication method fails */
975                 Sess_session_denied( ses );
976                 return;
977         }
978         decision = Sessions[ses].acp_ops.open_connection(Sessions[ses].name);
979         if (decision != ACM_ACCESS_ALLOWED)
980         {
981                 Sess_session_denied( ses );
982                 return;
983         }
984         /* NOTE: Here and in the backwards compat support in Sess_accept_cont are the only places
985          * that should authorize connections
986          */
987         Sess_session_authorized( ses );
988 }
Sess_session_denied(int ses)989 void    Sess_session_denied(int ses)
990 {
991         char response;
992 
993         if (!Is_preauth_session(Sessions[ses].status) )
994         {
995                 Alarm( EXIT, "Sess_session_denied: BUG! Session is already authorized (status 0x%x)\n", Sessions[ses].status);
996         }
997 
998         response = REJECT_AUTH;
999         send( Sessions[ses].mbox, &response, sizeof(response), 0 );
1000         Alarm( SESSION, "Sess_session_denied: Authorization denied for %s on mailbox %d\n",
1001                Sessions[ses].name,
1002                Sessions[ses].mbox );
1003         close( Sessions[ses].mbox );
1004 
1005         Sess_unhash_session (&Sessions[ses]);
1006 	Sess_remove_session (&Sessions[ses]);
1007         Num_sessions--;
1008         GlobalStatus.num_sessions = Num_sessions;
1009 
1010         return;
1011 }
1012 
Sess_session_authorized(int ses)1013 void    Sess_session_authorized(int ses)
1014 {
1015         char        ip[16];
1016         char        response;
1017         unsigned int    name_len;
1018         char	private_group_name[MAX_GROUP_NAME];
1019 
1020         if (!Is_preauth_session(Sessions[ses].status) )
1021         {
1022                 Alarm( EXIT, "Sess_session_authorized: BUG! Session is already authorized (status 0x%x)\n", Sessions[ses].status);
1023         }
1024 
1025         /*
1026          * accept message looks like:
1027          *
1028          * byte - ACCEPT_SESSION code
1029          * byte - version of spread
1030          * byte - subversion of spread
1031          * (optional) byte - patch version of spread (only if library is 3.15.0 or greater)
1032          * byte - len of name
1033          * len bytes - name
1034          *
1035          */
1036         Sessions[ses].num_mess = 0;
1037         response = ACCEPT_SESSION;
1038         send( Sessions[ses].mbox, &response, 1, 0 );
1039 
1040         response = SP_MAJOR_VERSION;
1041         send( Sessions[ses].mbox, &response, 1, 0 );
1042         response = SP_MINOR_VERSION;
1043         send( Sessions[ses].mbox, &response, 1, 0 );
1044 
1045         if (Sessions[ses].lib_version[0]*10000 + Sessions[ses].lib_version[1]*100 + Sessions[ses].lib_version[2] >= 31500)
1046         {
1047                 response = SP_PATCH_VERSION;
1048                 send( Sessions[ses].mbox, &response, 1, 0 );
1049         }
1050 
1051         sprintf(private_group_name, "#%s#%s", Sessions[ses].name, My.name );
1052         name_len = strlen( private_group_name );
1053         /* sending the len of the private group in one byte */
1054         response = name_len;
1055         send( Sessions[ses].mbox, &response, 1, 0 );
1056         /* sending the private group name */
1057         send( Sessions[ses].mbox, private_group_name, name_len, 0 );
1058 
1059         E_attach_fd( Sessions[ses].mbox, READ_FD, Sess_read, Sessions[ses].type, NULL,
1060                      LOW_PRIORITY );
1061         E_attach_fd( Sessions[ses].mbox, EXCEPT_FD, Sess_read, Sessions[ses].type, NULL,
1062                      LOW_PRIORITY );
1063 
1064         Sessions[ses].status = Set_op_session( Sessions[ses].status );
1065         Sessions[ses].status = Clear_preauth_session( Sessions[ses].status );
1066 
1067         Prot_Create_Local_Session(&Sessions[ses]);
1068 
1069         Message_reset_current_location(&(Sessions[ses].read) );
1070         Message_reset_current_location(&(Sessions[ses].write) );
1071         Sessions[ses].read.in_mess_head = 1;
1072 
1073         Log_sess_connect( Sessions[ses].mbox, Sessions[ses].address,
1074                           Sessions[ses].name );
1075 
1076         Conf_id_to_str( Sessions[ses].address, ip );
1077         Alarm( SESSION, "Sess_session_authorized: Accepting from %s with private name %s on mailbox %d\n",
1078                ip,
1079                Sessions[ses].name,
1080                Sessions[ses].mbox );
1081 }
Sess_validate_read_header(mailbox mbox,int ses,int head_size,message_header * head_ptr)1082 static  int     Sess_validate_read_header( mailbox mbox, int ses, int head_size, message_header *head_ptr)
1083 {
1084 	char		private_name[MAX_PRIVATE_NAME+1];
1085 	char		proc_name[MAX_PROC_NAME];
1086         int             ret, type_bits, memb_bits;
1087 
1088         /* Disallow more then one message type being set or more then one Join/Leave/Kill being set */
1089         if ( ( (type_bits = count_bits_set( head_ptr->type, 0, 6)) > 1) ||
1090              ( (memb_bits = count_bits_set( head_ptr->type, 16, 19)) > 1) ||
1091              ( (type_bits + memb_bits) != 1 ) )
1092         {
1093                 Alarm( SESSION, "Sess_validate_read_header: Message has illegal type field 0x%x\n", head_ptr->type);
1094                 return(-1);
1095         }
1096         head_ptr->private_group_name[MAX_GROUP_NAME -1] = '\0';
1097 
1098         ret = G_private_to_names( head_ptr->private_group_name, private_name, proc_name );
1099         if( ret < 0 )
1100         {
1101                 Alarm( SESSION, "Sess_validate_read_header: Message has illegal private_group_name (priv, proc)\n");
1102                 return(-1);
1103         }
1104         if( strncmp( proc_name, My.name, MAX_PROC_NAME ) != 0 )
1105         {
1106                 Alarm( SESSION, "Sess_validate_read_header: proc name %s is not my name %s\n",
1107                        proc_name, My.name );
1108                 return(-1);
1109         }
1110         if (strncmp(private_name , Sessions[ses].name, MAX_PRIVATE_NAME) )
1111         {
1112                 Alarm( PRINT, "Sess_validate_read_header: Session %s trying to make session %s do something\n",
1113                        private_name, Sessions[ses].name );
1114                 return(-1);
1115         }
1116 
1117         if ( (head_ptr->num_groups < 0) || (head_ptr->num_groups > MAX_GROUPS_PER_MESSAGE) )
1118         {
1119                 Alarm( SESSION, "Sess_validate_read_header: Message has negative or too large num_groups field\n", head_ptr->num_groups);
1120                 return(-1);
1121         }
1122         if ( head_ptr->hint & ~( 0x80000080 | 0x00ffff00 ) )
1123         {
1124                 Alarm( SESSION, "Sess_validate_read_header: Message has illegal hint field 0x%x\n", head_ptr->hint);
1125                 return(-1);
1126         }
1127 
1128         if ( (head_ptr->data_len < 0 ) || ( head_ptr->data_len > MAX_MESSAGE_BODY_LEN) )
1129         {
1130                 Alarm( SESSION, "Sess_validate_read_header: Message has negative or too large data_len %d\n", head_ptr->data_len);
1131                 return(-1);
1132         }
1133         if ( (head_ptr->data_len + MAX_GROUP_NAME * head_ptr->num_groups) > (MAX_MESSAGE_BODY_LEN - head_size ) )
1134         {
1135                 Alarm( SESSION, "Sess_validate_read_header: Message + Groups is too large (%d + %d = %d). MAX size is: %d\n",
1136                        head_ptr->data_len, MAX_GROUP_NAME * head_ptr->num_groups,
1137                        head_ptr->data_len + MAX_GROUP_NAME * head_ptr->num_groups,
1138                        MAX_MESSAGE_BODY_LEN - head_size );
1139                 return(-1);
1140         }
1141         /* Passed all checks, so valid */
1142         return( 0 );
1143 }
1144 
Sess_read(mailbox mbox,int dummy,void * d2)1145 static	void	Sess_read( mailbox mbox, int dummy, void *d2 )
1146 {
1147 	message_header	*head_ptr, *msg_head;
1148         message_obj     *msg;
1149         scatter         *scat;
1150 	down_link	*down_ptr;
1151         message_link    *mess_link;
1152 	int		packet_index, byte_index, to_read;
1153 	int             len, remain, ret;
1154         int             head_size, data_frag_len;
1155         int             ses, ioctl_cmd;
1156         char            *head_cbuf;
1157 #if 0
1158 #ifndef ARCH_SCATTER_NONE
1159 static  struct  msghdr  msgh;
1160 #endif  /* ARCH_SCATTER_NONE */
1161 #endif  /* 0 */
1162 #if 0
1163         /* we currently don't use recvmsg */
1164 #ifndef ARCH_SCATTER_NONE
1165 	msgh.msg_name    = (caddr_t) 0;
1166 	msgh.msg_namelen = 0;
1167 	msgh.msg_iov     = (struct iovec *)scat->elements;
1168 	msgh.msg_iovlen  = scat->num_elements;
1169 #endif  /* ARCH_SCATTER_NONE */
1170 
1171 #ifdef ARCH_SCATTER_CONTROL
1172 	msgh.msg_control = (caddr_t) 0;
1173 	msgh.msg_controllen = 0;
1174 #endif /* ARCH_SCATTER_CONTROL */
1175 #ifdef ARCH_SCATTER_ACCRIGHTS
1176 	msgh.msg_accrights = (caddr_t) 0;
1177 	msgh.msg_accrightslen = 0;
1178 #endif /* ARCH_SCATTER_ACCRIGHTS */
1179 #endif /* 0 */
1180 
1181         ses = Sess_get_session_index(mbox);
1182 	if( ses < 0 || ses >= MAX_SESSIONS ) {
1183             Alarm( PRINT, "Sess_read: Illegal mbox %d for read\n", mbox);
1184             return;
1185         }
1186         if (Sessions[ses].read_mess == NULL)
1187                 Sessions[ses].read_mess = Message_new_message();
1188 
1189         msg = Sessions[ses].read_mess;
1190 	head_ptr = Message_get_message_header(msg);
1191         head_cbuf = (char *) head_ptr;
1192         head_size = Message_get_header_size();
1193 
1194         /* set file descriptor to non blocking */
1195 	ioctl_cmd = 1;
1196 	ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
1197 
1198         if ( Sessions[ses].read.in_mess_head == 1 )
1199         {
1200                 /* read up to size of message_header */
1201                 len = Sessions[ses].read.cur_byte;
1202                 remain = sizeof(message_header) - len;
1203                 ret = recv( mbox, (char *) &head_cbuf[len], remain, 0 );
1204                 if( ret  == remain )
1205                 {
1206                         Sessions[ses].read.cur_byte += ret;
1207                         Message_set_location_begin_body(&(Sessions[ses].read) );
1208                 } else  if (ret > 0 ) {
1209                         Sessions[ses].read.cur_byte += ret;
1210                         ioctl_cmd = 0;
1211                         ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1212                         return;
1213                 } else {
1214                         /* error reading */
1215                         if ( (ret == -1) && ( (sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK) ) ) {
1216                                 ioctl_cmd = 0;
1217                                 ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1218                                 return;
1219                         }
1220                         Alarm( SESSION, "Sess_read: failed receiving header on session %d: ret %d: error: %s \n", mbox, ret, sock_strerror(sock_errno) );
1221                         Sess_kill( mbox );
1222                         ioctl_cmd = 0;
1223                         ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1224                         return;
1225                 }
1226                 /* When we get here we have a complete header */
1227                 ioctl_cmd = 0;
1228                 ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1229 
1230                 /* Fliping message header to my form if needed */
1231                 if( !Same_endian( head_ptr->type ) )
1232                 {
1233                         Flip_mess( head_ptr );
1234                 }
1235                 /* Setting endian to my endian on the header */
1236                 head_ptr->type = Set_endian( head_ptr->type );
1237 
1238                 /* Validate all fields */
1239                 Alarm( SESSION, "Sess_read: Message has type field 0x%x\n", head_ptr->type);
1240                 ret = Sess_validate_read_header( mbox, ses, head_size, head_ptr);
1241                 if (ret < 0 )
1242                 {
1243                         /* invalid header */
1244                         Sess_kill(mbox);
1245                         return;
1246                 }
1247         } /* finished reading and validating  header */
1248 
1249 	/*
1250 	 * to do recvmsg, but need to trick with the starting AFTER the header
1251 	 * on the first packet, and then to return the big_scatter to original
1252 	 * form. read at *most* head_ptr->data_len (set scatter lengths accordingly
1253 	 * everytime here from scratch!
1254 	 *
1255 	 * ret = recvmsg( mbox, &msg, 0 );
1256 	 * if( ret <=0 )
1257 	 * {
1258 	 * 	Alarm( SESSION, "Sess_read: failed receiving message on session %d\n", mbox );
1259 	 * 	Sess_kill( mbox );
1260 	 * 	return;
1261 	 * }
1262 	 */
1263 
1264 	/* read the rest of the message if needed, reserving room at the beginning
1265          * of the first fragment(scat buf) for the message header and the lts and seq fields. */
1266 
1267         /* enable non-blocking io */
1268 	ioctl_cmd = 1;
1269 	ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
1270 
1271         data_frag_len = Message_get_data_fragment_len();
1272         scat = Message_get_data_scatter(msg);
1273 	remain = ( head_ptr->data_len + MAX_GROUP_NAME*head_ptr->num_groups )  - Sessions[ses].read.total_bytes;
1274 	for(  ; remain > 0; remain -= ret )
1275 	{
1276 		packet_index = Sessions[ses].read.cur_element;
1277 		byte_index   = Sessions[ses].read.cur_byte;
1278                 if (packet_index >= scat->num_elements)
1279                 {
1280                         /* We are beginning a new fragment -- so allocate it */
1281                         assert(byte_index == 0);
1282                         Message_add_scat_element(msg);
1283                 }
1284 		to_read = ( data_frag_len - byte_index );
1285 		if( to_read > remain ) to_read = remain;
1286 		ret = recv( mbox, &scat->elements[packet_index].buf[byte_index],
1287 				to_read, 0 );
1288                 if( ret  == to_read )
1289                 {
1290                         Sessions[ses].read.cur_byte = 0;
1291                         Sessions[ses].read.cur_element++;
1292                         Sessions[ses].read.total_bytes += ret;
1293                 } else  if (ret > 0 ) {
1294                         Sessions[ses].read.cur_byte += ret;
1295                         Sessions[ses].read.total_bytes += ret;
1296                         ioctl_cmd = 0;
1297                         ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1298                         return;
1299                 } else {
1300                         if ( (ret == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) ) {
1301                                 ioctl_cmd = 0;
1302                                 ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1303                                 return;
1304                         }
1305 			Alarm( SESSION, "Sess_read: failed receiving message on session %d, ret is %d: error: %s\n", mbox, ret, sock_strerror(sock_errno) );
1306 			Alarm( SESSION, "Sess_read: failed recv msg more info: len read: %d, remain: %d, to_read: %d, pkt_index: %d, b_index: %d, scat_nums: %d\n",Sessions[ses].read.total_bytes, remain, to_read, packet_index, byte_index, scat->num_elements );
1307 			Sess_kill( mbox );
1308                         ioctl_cmd = 0;
1309                         ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1310 			return;
1311 		}
1312 	}
1313 
1314         /* We now have a complete message */
1315         ioctl_cmd = 0;
1316         ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1317 
1318         /* reset active read_mess to empty */
1319         Message_reset_current_location(&(Sessions[ses].read));
1320         Sessions[ses].read.in_mess_head = 1;
1321         Sessions[ses].read_mess = NULL;
1322 
1323         Message_element_len_fixup(msg);
1324 
1325 #ifdef  PROBE_LATENCY
1326         if (Is_latency_mess( head_ptr->type ) )
1327         {
1328                 int32u          initial_offset, htime_offset;
1329                 int32u          *p_time_offset;
1330                 sp_time         cur_time, ncur_time;
1331 
1332                 initial_offset = MAX_GROUP_NAME*head_ptr->num_groups + head_size;
1333                 p_time_offset = (int32u *) &msg->body.elements[0].buf[initial_offset];
1334                 htime_offset = ntohl(*p_time_offset);
1335                 Alarm(SESSION, "Sess_read: Msg Data at %d with time_offset %u \n", initial_offset, htime_offset);
1336                 cur_time = E_get_time();
1337                 ncur_time.sec = htonl(cur_time.sec);
1338                 ncur_time.usec = htonl(cur_time.usec);
1339                 memcpy(&(msg->body.elements[0].buf[htime_offset + initial_offset]), &ncur_time, sizeof(sp_time));
1340                 Alarm(SESSION, "Sess_read: timestamped time (%d, %d) in byte %d of message\n", cur_time.sec, cur_time.usec, htime_offset);
1341                 *p_time_offset = htonl(htime_offset + sizeof(sp_time) );
1342         }
1343 #endif  /* PROBE_LATENCY */
1344 
1345         /* Do ACM access control checks */
1346         /* Note, disconnects (Is_kill_mess) are not limited. Someone can always cut themselves off */
1347         if ( Is_leave_mess( head_ptr->type ) )
1348         {
1349                 char *groups_ptr;
1350                 int decision;
1351                 groups_ptr = Message_get_first_group( msg );
1352                 decision = Sessions[ses].acp_ops.leave_group( head_ptr->private_group_name, groups_ptr, NULL);
1353                 if (decision != ACM_ACCESS_ALLOWED)
1354                 {
1355                         head_ptr->type = (head_ptr->type & ~LEAVE_MESS);
1356                         head_ptr->type |= CAUSED_BY_LEAVE;
1357                         Sess_create_reject_message( msg );
1358                         Sess_deliver_reject( msg );
1359                         return;
1360                 }
1361         }
1362         if ( Is_join_mess( head_ptr->type ) )
1363         {
1364                 char *groups_ptr;
1365                 int decision;
1366                 groups_ptr = Message_get_first_group( msg );
1367 
1368                 /* Make sure we don't let a join happen if the limit has been reached. */
1369 		if( G_get_num_local( groups_ptr ) == MAX_LOCAL_GROUP_MEMBERS ) {
1370                         Alarm( PRINT, "Sess_read: Attempt by session %s to join group %s "
1371                                "failed: too many local members.\n", head_ptr->private_group_name, groups_ptr );
1372                         head_ptr->type = (head_ptr->type & ~JOIN_MESS);
1373                         head_ptr->type |= CAUSED_BY_JOIN;
1374                         Sess_create_reject_message( msg );
1375                         Sess_deliver_reject( msg );
1376                         return;
1377                 }
1378 
1379                 decision = Sessions[ses].acp_ops.join_group( head_ptr->private_group_name, groups_ptr, NULL);
1380                 if (decision != ACM_ACCESS_ALLOWED)
1381                 {
1382                         head_ptr->type = (head_ptr->type & ~JOIN_MESS);
1383                         head_ptr->type |= CAUSED_BY_JOIN;
1384                         Sess_create_reject_message( msg );
1385                         Sess_deliver_reject( msg );
1386                         return;
1387                 }
1388         }
1389         if ( Is_only_regular_mess( head_ptr->type ) )
1390         {
1391                 char *groups_ptr;
1392                 char target_groups[MAX_GROUPS_PER_MESSAGE][MAX_GROUP_NAME];
1393                 int decision, num_p2p_dest;
1394                 groups_ptr = Message_get_groups_array( msg );
1395                 num_p2p_dest = Sess_get_p2p_dests(head_ptr->num_groups, (char (*)[MAX_GROUP_NAME])groups_ptr, target_groups);
1396                 if (num_p2p_dest)
1397                 {
1398                         decision = Sessions[ses].acp_ops.p2p_send( head_ptr->private_group_name, num_p2p_dest, target_groups, head_ptr->type,  ( (head_ptr->hint >> 8) & 0x0000ffff) );
1399                         if (decision != ACM_ACCESS_ALLOWED)
1400                         {
1401                                 Sess_create_reject_message( msg );
1402                                 Sess_deliver_reject( msg );
1403                                 return;
1404                         }
1405                 }
1406                 if (head_ptr->num_groups > num_p2p_dest)
1407                 {
1408                         decision = Sessions[ses].acp_ops.mcast_send( head_ptr->private_group_name, head_ptr->num_groups, (char (*)[MAX_GROUP_NAME])groups_ptr, head_ptr->type, ( (head_ptr->hint >> 8) & 0x0000ffff) );
1409                         if (decision != ACM_ACCESS_ALLOWED)
1410                         {
1411                                 Sess_create_reject_message( msg );
1412                                 Sess_deliver_reject( msg );
1413                                 return;
1414                         }
1415                 }
1416         }
1417 
1418 	/* create new down_link and big_scatter */
1419         down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), mbox, 0);
1420         if (down_ptr == NULL)
1421         {
1422                 Alarm( SESSION, "Sess_read: Session has illegal message type 0x%x\n", head_ptr->type);
1423                 Sess_kill( mbox );
1424                 return;
1425         }
1426         down_ptr->mess = msg;
1427         msg_head = Message_get_message_header(down_ptr->mess);
1428 
1429         if (Is_kill_mess(msg_head->type) )
1430         {
1431                 /* We are going to overwrite the group that is sent from the library.
1432                  * it is not needed for kill messages, so we shrink the data field to ignore it
1433                  */
1434                 len = Message_kill_mess_fixup(down_ptr->mess, Sessions[ses].read.total_bytes - MAX_GROUP_NAME, mbox);
1435 
1436                 /* A bug in both 3.13 and 4 I think is that if we get a DISCONNECT message
1437                  * from the client and we process and deliver that before discovering the
1438                  * closed socket ourselves and calling Sess_kill(), then the session
1439                  * is in the wrong state and we will crash when we try to finish delivery.
1440                  */
1441                 Log_sess_disconnect( Sessions[ses].mbox, Sessions[ses].address, Sessions[ses].name,
1442                                      Sessions[ses].num_mess );
1443                 /* clear his structure */
1444                 while( Sessions[ses].num_mess > 0 )
1445                 {
1446                         mess_link = Sessions[ses].first;
1447                         Sessions[ses].first = Sessions[ses].first->next;
1448                         Sess_dispose_message( mess_link );
1449                         Sessions[ses].num_mess--;
1450                 }
1451 
1452                 /* close the mailbox and mark it unoperational */
1453                 E_dequeue( Sess_badger, mbox, NULL );
1454                 E_detach_fd( mbox, READ_FD );
1455                 E_detach_fd( mbox, EXCEPT_FD );
1456                 close( mbox );
1457                 /* the mailbox is closed but the entry still points to it */
1458                 Sessions[ses].status = Clear_op_session( Sessions[ses].status );
1459                 Alarm( SESSION, "Sess_read: disconnecting session %s ( mailbox %d )\n",Sessions[ses].name, mbox );
1460         }
1461 
1462 	Alarm( SESSION, "Sess_read: queueing message of type %d with len %d to the protocol\n",
1463 		down_ptr->type, Sessions[ses].read.total_bytes );
1464 	Prot_new_message( down_ptr, Sessions[ses].down_queue );
1465 }
1466 
Sess_get_p2p_dests(int num_groups,char groups[][MAX_GROUP_NAME],char dests[][MAX_GROUP_NAME])1467 static  int     Sess_get_p2p_dests( int num_groups, char groups[][MAX_GROUP_NAME], char dests[][MAX_GROUP_NAME] )
1468 {
1469         int i, num_p2p_targets = 0;
1470 	for( i=0; i < num_groups; i++ )
1471 	{
1472 		if( groups[i][0] == '#' )
1473 		{
1474 			/* private group */
1475                         memcpy( dests[num_p2p_targets], groups[i], MAX_GROUP_NAME);
1476                         num_p2p_targets++;
1477 		}
1478         }
1479         return( num_p2p_targets );
1480 }
1481 /* Take a message received from a client and change it into the form of
1482  * a reject message. Destination groups, user data, mess_type and type field
1483  * are all preserved to give the sender information about what message was
1484  * rejected.
1485  */
Sess_create_reject_message(message_obj * msg)1486 static  void    Sess_create_reject_message ( message_obj *msg )
1487 {
1488         message_header  *head_ptr;
1489         int32u          old_type;
1490 
1491         head_ptr = Message_get_message_header(msg);
1492 
1493         old_type = head_ptr->type;
1494         head_ptr->type = REJECT_MESS;
1495         head_ptr->type = Set_endian( head_ptr->type );
1496         /* If original message was SELF_DISCARD, then maintain that state */
1497         if (Is_self_discard( old_type) ) head_ptr->type |= SELF_DISCARD;
1498 
1499         Message_add_oldtype_to_reject( msg, old_type );
1500 
1501         Alarm( PRINT, "Sess_create_reject_mess: Created Reject for sender %s type 0x%x oldtype 0x%x for first group %s\n",
1502                head_ptr->private_group_name, head_ptr->type, old_type, Message_get_first_group( msg ) );
1503         return;
1504 }
1505 
Sess_deliver_reject(message_obj * msg)1506 static  void    Sess_deliver_reject( message_obj *msg )
1507 {
1508         message_link    *mess_link;
1509 
1510         mess_link = new(MESSAGE_LINK);
1511         if (mess_link == NULL ) {
1512                 Alarm(EXIT, "Sess_deliver_reject: Failed to allocate a new MESSAGE_LINK.\n");
1513                 return;
1514         }
1515         mess_link->mess = msg;
1516         mess_link->next = NULL;
1517 
1518         Sess_deliver_message( mess_link );
1519 
1520         return;
1521 }
1522 
Sess_write(int ses,message_link * mess_link,int * needed)1523 void    Sess_write( int ses, message_link *mess_link, int *needed )
1524 {
1525         message_obj     *msg;
1526 	message_link	*tmp_link;
1527         scatter         *scat;
1528 	int		ioctl_cmd;
1529 	int 		ret;
1530 	int		total_to_send;
1531 	int		len_sent, first_data_len;
1532         char            *first_data_ptr;
1533 	int		i;
1534         message_header  *head_ptr;
1535 
1536 	if( !Is_op_session( Sessions[ses].status ) ) return;
1537 
1538 	if( Sessions[ses].num_mess >= MAX_SESSION_MESSAGES )
1539 	{
1540 		Alarm( SESSION,
1541 			"Sess_write: killing mbox %d for not reading\n",
1542 			Sessions[ses].mbox );
1543 		Sess_kill( Sessions[ses].mbox );
1544 		return;
1545 	}
1546 
1547 	if( Sessions[ses].num_mess > 0 )
1548 	{
1549 		Sess_badger( Sessions[ses].mbox , NULL);
1550 	}
1551 
1552 	msg = mess_link->mess;
1553         Obj_Inc_Refcount(msg);
1554         scat = Message_get_data_scatter(msg);
1555 
1556 	for( total_to_send=0, i=0; i < scat->num_elements; i++ )
1557 		total_to_send += scat->elements[i].len;
1558 
1559         /* since also sending message_header */
1560         total_to_send += Message_get_non_body_header_size();
1561 
1562         head_ptr = Message_get_message_header(msg);
1563 #ifdef  PROBE_LATENCY
1564         if (Is_latency_mess( head_ptr->type ) )
1565         {
1566                 int32u          initial_offset, htime_offset;
1567                 int32u          *p_time_offset;
1568                 sp_time         cur_time, ncur_time;
1569 
1570                 initial_offset = MAX_GROUP_NAME*head_ptr->num_groups;
1571                 p_time_offset = (int32u *) &msg->body.elements[0].buf[initial_offset];
1572                 htime_offset = ntohl(*p_time_offset);
1573                 Alarm(SESSION, "Sess_write: Msg Data at %d with timeoffset %u\n", initial_offset, htime_offset);
1574                 cur_time = E_get_time();
1575                 ncur_time.sec = htonl(cur_time.sec);
1576                 ncur_time.usec = htonl(cur_time.usec);
1577                 memcpy(&(msg->body.elements[0].buf[htime_offset + initial_offset]), &ncur_time, sizeof(sp_time));
1578                 Alarm(SESSION, "Sess_write: timestamped time (%d, %d) in byte %d of message\n", cur_time.sec, cur_time.usec, htime_offset);
1579                 *p_time_offset = htonl(htime_offset + sizeof(sp_time) );
1580         }
1581 #endif
1582 	len_sent = 0;
1583 	if( Sessions[ses].num_mess == 0 )
1584 	{
1585 		/* set file descriptor to non blocking */
1586 		ioctl_cmd = 1;
1587 		ret = ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1588 
1589                 /* Send the first part of message which must be handled specially */
1590                 ret = send(Sessions[ses].mbox, (char *) head_ptr,
1591                            sizeof(message_header), 0);
1592                 if ( ret > 0 ) len_sent += ret;
1593                 if ( ret != sizeof(message_header) )
1594                 {
1595                         goto end_write;
1596                 }
1597                 /* Send the first data of message which must be handled specially */
1598                 first_data_ptr = Message_get_first_data_ptr(msg);
1599                 first_data_len = Message_get_first_data_len(msg);
1600                 ret = send(Sessions[ses].mbox, first_data_ptr,
1601                            first_data_len, 0);
1602                 if ( ret > 0 ) len_sent += ret;
1603                 if ( ret != first_data_len )
1604                 {
1605                         goto end_write;
1606                 }
1607 		/* send the message after first buffer*/
1608 		for( i=1; i < scat->num_elements; i++ )
1609 		{
1610 			ret = send( Sessions[ses].mbox, scat->elements[i].buf,
1611 							scat->elements[i].len, 0);
1612 			if( ret > 0 ) len_sent += ret;
1613 			if( ret != scat->elements[i].len )
1614 			{
1615 				break;
1616 			}
1617 		}
1618 		/* set file descriptor back to blocking */
1619         end_write:
1620 		ioctl_cmd = 0;
1621 		ret = ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1622 
1623 	}
1624 
1625 	if( len_sent < total_to_send )
1626 	{
1627 		/* this message has to be linked */
1628 		if( *needed )
1629 		{
1630 			/* create a copy of mess_link and link it */
1631 			tmp_link = new(MESSAGE_LINK);
1632                         if (tmp_link == NULL ) {
1633                                 Alarm(EXIT, "Sess_write: Failed to allocate a new MESSAGE_LINK.\n");
1634                                 return;
1635                         }
1636                         tmp_link->mess = Message_copy_message(msg);
1637 			++*needed;
1638 		}else{
1639 			/* should link mess_link itself */
1640 			tmp_link = mess_link;
1641 			*needed=1;
1642 		}
1643 		/* link the message */
1644 		tmp_link->next = 0;
1645 		if( Sessions[ses].num_mess == 0 )
1646 		{
1647 			Sessions[ses].first = tmp_link;
1648 			Sessions[ses].last = tmp_link;
1649 			/* setting cur_element and cur_byte */
1650                         Message_calculate_current_location(tmp_link->mess, len_sent, &(Sessions[ses].write) );
1651 
1652 			/* We will need to badger this guy */
1653 			E_queue( Sess_badger, Sessions[ses].mbox, NULL, Badger_timeout );
1654 		}else{
1655 			/* This guy was already badgered */
1656 			Sessions[ses].last->next = tmp_link;
1657 			Sessions[ses].last = tmp_link;
1658 		}
1659 		Sessions[ses].num_mess++;
1660 	}
1661         Message_Dec_Refcount(msg);
1662 }
1663 
Sess_badger(mailbox mbox,void * dummy)1664 static	void	Sess_badger( mailbox mbox, void *dummy )
1665 {
1666 	int		ses;
1667 	message_link	*mess_link;
1668 	int		able_to_write;
1669         message_obj     *msg;
1670         scatter         *scat;
1671 	int		ioctl_cmd;
1672 	int		bytes_to_send, from;
1673 	int		i;
1674 	int		ret;
1675 
1676 	Alarm( SESSION, "Sess_badger: for mbox %d\n", mbox );
1677 	ses = Sess_get_session_index( mbox );
1678 	if( ses < 0 || ses >= MAX_SESSIONS ) return;
1679 	if( !Is_op_session( Sessions[ses].status ) ) return;
1680 
1681 	if( Sessions[ses].num_mess <= 0 ) return;
1682 
1683 	/* set file descriptor to non blocking */
1684 	ioctl_cmd = 1;
1685 	ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
1686 
1687 	for( able_to_write = 1 ; Sessions[ses].num_mess > 0 && able_to_write;  )
1688 	{
1689 		msg = Sessions[ses].first->mess;
1690                 Obj_Inc_Refcount(msg);
1691 
1692                 /* First if we havn't sent the mess_head yet, send that special
1693                  * and then reset the cur_byte and in_mess_head fields
1694                  */
1695                 if ( Sessions[ses].write.in_mess_head == 1 )
1696                 {
1697                         char *tmp_buf;
1698 
1699                         tmp_buf = (char *) Message_get_message_header(msg);
1700                         from = Sessions[ses].write.cur_byte;
1701                         bytes_to_send = sizeof(message_header) - from;
1702                         ret = send(mbox, &tmp_buf[from], bytes_to_send, 0);
1703                         if (ret == bytes_to_send)
1704                         {
1705                                 Message_set_location_begin_body(&(Sessions[ses].write));
1706                         } else
1707                         {
1708                                 if (ret > 0) Sessions[ses].write.cur_byte = from + ret;
1709                                 able_to_write = 0;
1710                         }
1711                 }
1712                 /* Next if we still can, send some body of this messsage */
1713                 if ( able_to_write )
1714                 {
1715                         scat = Message_get_data_scatter(msg);
1716                         for( i=Sessions[ses].write.cur_element, from=Sessions[ses].write.cur_byte;
1717                              i < scat->num_elements;
1718                              i++, from=0 )
1719                         {
1720                                 bytes_to_send = scat->elements[i].len - from;
1721                                 ret = send( mbox, &(scat->elements[i].buf[from]), bytes_to_send, 0);
1722                                 if( ret == bytes_to_send )
1723                                 {
1724                                         Sessions[ses].write.cur_byte = 0;
1725                                         Sessions[ses].write.cur_element++;
1726                                 }else{
1727                                         if( ret > 0 ) Sessions[ses].write.cur_byte += ret;
1728                                         able_to_write = 0;
1729                                         break;
1730                                 }
1731                         }
1732                 }
1733 		if( able_to_write )
1734 		{
1735 			/* free that message */
1736 			mess_link = Sessions[ses].first;
1737 			Sessions[ses].first = Sessions[ses].first->next;
1738 			Sessions[ses].num_mess--;
1739                         Message_reset_current_location(&(Sessions[ses].write) );
1740 			Sess_dispose_message( mess_link );
1741 		}
1742                 Message_Dec_Refcount(msg);
1743 	} /* for loop per message */
1744 	/* set file descriptor back to blocking */
1745 	ioctl_cmd = 0;
1746 	ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
1747 
1748 	if( Sessions[ses].num_mess > 0 ) E_queue( Sess_badger, mbox, NULL, Badger_timeout );
1749 }
1750 
Sess_kill(mailbox mbox)1751 static	void	Sess_kill( mailbox mbox )
1752 {
1753 	int		ses;
1754 	message_link	*mess_link;
1755 	message_obj	*kill_mess;
1756         message_header  *kill_head;
1757 	down_link	*down_ptr;
1758         int             head_size;
1759         char            private_send_group[MAX_GROUP_NAME];
1760 
1761         head_size = Message_get_header_size();
1762 	ses = Sess_get_session_index(mbox);
1763 	if( ses < 0 || ses >= MAX_SESSIONS ) {
1764             Alarm( PRINT, "Sess_kill: Illegal mbox %d for killing. Cannot cleanup\n", mbox);
1765             return;
1766         }
1767 	if( !Is_op_session( Sessions[ses].status ) )
1768         {
1769                 if ( !Is_preauth_session( Sessions[ses].status ) )
1770                         Alarm( PRINT,
1771                                "Sess_kill: session %s with mailbox %d is already killed (status %d)\n",
1772                                Sessions[ses].name, mbox, Sessions[ses].status );
1773                 else
1774                         Alarm( EXIT,
1775                                "Sess_kill: BUG! session %s with mailbox %d killed before authentication and authorization completed (status 0x%x)\n",
1776                                Sessions[ses].name, mbox, Sessions[ses].status);
1777         }
1778 
1779 	/* send a message to kill this session */
1780 	sprintf( private_send_group, "#%s#%s", Sessions[ses].name, My.name );
1781         kill_mess = Message_create_message(KILL_MESS | SAFE_MESS, private_send_group);
1782         kill_head = Message_get_message_header(kill_mess);
1783         Message_kill_mess_fixup(kill_mess, head_size, mbox );
1784 
1785         down_ptr = Prot_Create_Down_Link(kill_mess, Message_get_packet_type(kill_head->type), mbox, 0);
1786 	down_ptr->mess = kill_mess;
1787 
1788         Obj_Inc_Refcount(down_ptr->mess);
1789 	Prot_new_message( down_ptr, Sessions[ses].down_queue );
1790         Message_Dec_Refcount(kill_mess);
1791 
1792 	Log_sess_disconnect( Sessions[ses].mbox, Sessions[ses].address, Sessions[ses].name,
1793 			     Sessions[ses].num_mess );
1794 	/* clear his structure */
1795 	while( Sessions[ses].num_mess > 0 )
1796 	{
1797 		mess_link = Sessions[ses].first;
1798 		Sessions[ses].first = Sessions[ses].first->next;
1799 		Sess_dispose_message( mess_link );
1800 		Sessions[ses].num_mess--;
1801 	}
1802         /* reset active read_mess to empty */
1803         Message_reset_current_location(&(Sessions[ses].read));
1804         Sessions[ses].read.in_mess_head = 1;
1805         if (Sessions[ses].read_mess != NULL)
1806             Message_dispose_message( Sessions[ses].read_mess );
1807         Sessions[ses].read_mess = NULL;
1808 
1809 	/* close the mailbox and mark it unoperational */
1810 	E_dequeue( Sess_badger, mbox, NULL );
1811 	E_detach_fd( mbox, READ_FD );
1812 	E_detach_fd( mbox, EXCEPT_FD );
1813         close(mbox);
1814 	/* the mailbox is closed but the entry still points to it */
1815 	Sessions[ses].status = Clear_op_session( Sessions[ses].status );
1816 	Alarm( SESSION, "Sess_kill: killing session %s ( mailbox %d )\n",Sessions[ses].name, mbox );
1817 }
1818 
Sess_deliver_message(message_link * mess_link)1819 void	Sess_deliver_message( message_link *mess_link )
1820 {
1821 static	int		target_sessions[MAX_SESSIONS];
1822 	int		num_target_sessions;
1823 	int		source_ses;
1824 	message_header	*head_ptr;
1825         message_obj     *msg;
1826         scatter         *scat;
1827 	char		*target_groups;
1828 	char		private_name[MAX_PRIVATE_NAME+1];
1829 	char		proc_name[MAX_PROC_NAME];
1830 	int		needed;
1831 	int		i;
1832 
1833 	msg = mess_link->mess;
1834         Obj_Inc_Refcount(msg);
1835 	head_ptr = Message_get_message_header(msg);
1836 
1837         Message_endian_correct_message_header(msg);
1838 
1839 	if( Is_join_mess( head_ptr->type ) )
1840 	{
1841 		Sess_handle_join( mess_link );
1842                 Message_Dec_Refcount(msg);
1843 		return;
1844 	}
1845 
1846 	if( Is_leave_mess( head_ptr->type ) )
1847 	{
1848 		Sess_handle_leave( mess_link );
1849                 Message_Dec_Refcount(msg);
1850 		return;
1851 	}
1852 
1853 	if( Is_kill_mess( head_ptr->type ) )
1854 	{
1855 		Sess_handle_kill( mess_link );
1856                 Message_Dec_Refcount(msg);
1857 		return;
1858 	}
1859 
1860 	if( Is_groups_mess( head_ptr->type ) )
1861 	{
1862 		G_handle_groups( mess_link );
1863                 Message_Dec_Refcount(msg);
1864 		return;
1865 	}
1866 
1867 	/* regular message */
1868 
1869 	GlobalStatus.message_delivered++;
1870 
1871 	/* Setting endian to my endian on the header */
1872 	head_ptr->type = Set_endian( head_ptr->type );
1873         scat = Message_get_data_scatter(msg);
1874 	/* analyze message  groups to sessions  */
1875         if ( Is_reject_mess(head_ptr->type) ) {
1876             num_target_sessions = 1;
1877             G_private_to_names( head_ptr->private_group_name, private_name, proc_name );
1878             target_sessions[0] = Sess_get_session( private_name );
1879         } else {
1880             target_groups = Message_get_groups_array(msg);
1881             num_target_sessions = G_analize_groups( head_ptr->num_groups,
1882                                                     (char (*)[MAX_GROUP_NAME])target_groups,
1883                                                     target_sessions ) ;
1884         }
1885 	/* if self_discard, sender is local and a target then eliminate sender from targets */
1886 	source_ses = -1;
1887 	if( num_target_sessions > 0 && Is_self_discard( head_ptr->type ) )
1888 	{
1889 		G_private_to_names( head_ptr->private_group_name, private_name, proc_name );
1890 		if( strcmp( My.name, proc_name ) == 0 )
1891 		{
1892 			source_ses = Sess_get_session( private_name );
1893 		}
1894 	}
1895 	needed = 0;
1896 	for( i = 0; i < num_target_sessions ; i++ )
1897 	{
1898 		if( source_ses == target_sessions[i] ) continue; /* self_discard */
1899 		Sess_write( target_sessions[i], mess_link, &needed );
1900 	}
1901 
1902         Message_Dec_Refcount(msg);
1903 	if( !needed ) Sess_dispose_message( mess_link );
1904 }
1905 
Sess_deliver_reg_memb(configuration reg_memb,membership_id reg_memb_id)1906 void	Sess_deliver_reg_memb( configuration reg_memb, membership_id reg_memb_id )
1907 {
1908 	G_handle_reg_memb( reg_memb, reg_memb_id );
1909 }
1910 
Sess_deliver_trans_memb(configuration trans_memb,membership_id trans_memb_id)1911 void	Sess_deliver_trans_memb( configuration trans_memb, membership_id trans_memb_id )
1912 {
1913 	G_handle_trans_memb( trans_memb, trans_memb_id );
1914 }
1915 
Sess_handle_join(message_link * mess_link)1916 static	void	Sess_handle_join( message_link *mess_link )
1917 {
1918 	message_header	*join_head;
1919 	char		*group_name;
1920 
1921 	join_head = Message_get_message_header(mess_link->mess);
1922 	group_name = Message_get_first_group(mess_link->mess);
1923 
1924 	G_handle_join( join_head->private_group_name, group_name );
1925 
1926 	Sess_dispose_message( mess_link );
1927 
1928 }
1929 
Sess_handle_leave(message_link * mess_link)1930 static	void	Sess_handle_leave( message_link *mess_link )
1931 {
1932 	message_header	*leave_head;
1933 	char		*group_name;
1934 
1935 	leave_head = Message_get_message_header(mess_link->mess);
1936 	group_name = Message_get_first_group(mess_link->mess);
1937 
1938 	G_handle_leave( leave_head->private_group_name, group_name );
1939 
1940 	Sess_dispose_message( mess_link );
1941 }
1942 
Sess_handle_kill(message_link * mess_link)1943 static	void	Sess_handle_kill( message_link *mess_link )
1944 {
1945 	char		private_name[MAX_PRIVATE_NAME+1];
1946 	char		proc_name[MAX_PROC_NAME];
1947 	message_header	*kill_head;
1948 	int		ses;
1949 	int		ret;
1950 
1951 	kill_head = Message_get_message_header(mess_link->mess);
1952 
1953 	ret = G_private_to_names( kill_head->private_group_name, private_name, proc_name );
1954 	if( ret < 0 ) Alarm( EXIT, "Sess_handle_kill: Illegal private name to kill %s\n",
1955 				kill_head->private_group_name );
1956 
1957 	G_handle_kill( kill_head->private_group_name );
1958 
1959 	if( strcmp( My.name, proc_name ) == 0 )
1960 	{
1961 		/* my machine, we need to find the session */
1962 		ses = Sess_get_session( private_name );
1963 		if( ses >= 0 )
1964 		{
1965 			/* delete session ses */
1966 			if( Is_op_session( Sessions[ses].status ) )
1967 				Alarm( EXIT, "Sess_handle_kill: killing unkilled session bug!\n");
1968 			Sess_unhash_session (&Sessions[ses]);
1969                         Prot_Destroy_Local_Session(&(Sessions[ses]) );
1970 			Sess_remove_session (&Sessions[ses]);
1971 			Num_sessions--;
1972 			GlobalStatus.num_sessions = Num_sessions;
1973 		}
1974 	}
1975         Prot_kill_session(mess_link->mess);
1976 
1977 	Sess_dispose_message( mess_link );
1978 }
1979 
Sess_dispose_message(message_link * mess_link)1980 void	Sess_dispose_message( message_link *mess_link )
1981 {
1982         Message_dispose_message(mess_link->mess);
1983 	dispose( mess_link );
1984 }
1985 
Sess_get_session(char * name)1986 int	Sess_get_session( char *name )
1987 {
1988 	int	ret;
1989 	session	*ses;
1990 
1991 	for( ses = Sessions_head; ses; ses = ses->sort_next )
1992 	{
1993 		ret = strcmp( ses->name, name );
1994 		if( ret <  0 ) continue;
1995 		if( ret == 0 ) return( ses - Sessions );
1996 		if( ret >  0 ) return( -1 );
1997 	}
1998 	return( -1 );
1999 }
2000 
Flip_mess(message_header * head_ptr)2001 void    Flip_mess( message_header *head_ptr )
2002 {
2003 	head_ptr->type		= Flip_int32( head_ptr->type );
2004 	head_ptr->num_groups	= Flip_int32( head_ptr->num_groups );
2005 	head_ptr->data_len	= Flip_int32( head_ptr->data_len );
2006 }
2007 
2008